How (well) does MongoDB integrate with Hadoop?

Feb 15, 2013
Alex Piggott

Overview

Here at IKANOW, we are big fans of MongoDB, for reasons we’ve written about before: it has decent performance, scales sensibly, and above all its schema-less JSON approach makes it a joy to develop with, and ensures easy integration with a host of other modern technology platforms.

One area where MongoDB is less well developed than many NoSQL alternatives is in its support for Hadoop. This is not surprising since technologies like HBase and Accumolo are in fact built on top of the hdfs/Hadoop stack.

It should be noted that the combination of MongoDB’s query language and JSON integration already enables many of the functions Hadoop is used for, like data transformation. The latest MongoDB release also includes an aggregation framework that is comparable to Hive, and will be sufficient for many simpler analytics (here is a nice example of a typical use case).

However, especially given the complexity of documents MongoDB is capable of storing, there is much scope for analysis more complex than can be expressed in either the aggregation framework or MongoDB’s built-in map/reduce framework (at least maintainably). Examples of such analytics that we have developed for the semi-structured sources we typically store include:

  • Clustering documents based on the entities (eg proper nouns) they contain.
  • Graph analysis for recommending new sales leads based on their links with existing leads
    • (or equivalently for detecting potential fraud networks in business!)
  • Generating risk scores for public profiles in social networks.

For these more sophisticated algorithms, Hadoop is our (and many others’) preferred platform:

  • It has a huge community support, including integration with powerful analytic libraries and languages like R and Mahout.
  • It scales well.
  • It’s primary language is Java, which sits in a nice intersection of decent developer base, performance, maintainability, and ease-of-use.

It is beyond the scope of this post to describe Hadoop’s goals and architecture in detail (there is no shortage of material!); in essence it has three themes:

  • distributed (and redundant) storage
  • distributing processing providing both scalability as the amount of required processing increases (eg because of increasing data volumes) and also…
  • data locality, where the data is processed on one of the nodes on which the data resides, minimizing I/O.
    • (data locality becomes less important the more processor intensive the application)

Outline of this post

The remainder of this post (aka the technical aka the fun bits!) takes a brief theoretical and empirical look at how MongoDB’s replica/sharding/indexing architecture fits with Hadoop, how the current connector works, and possible improvements to what is currently publicly available with MongoDB. Specifically we will look at:

  • Analyzing the Default Mongo-Hadoop Connector
  • Data Locality
  • Run Times Based on Data Quantity
  • Run Time Improvement for Smaller Queries
  • A More General Solution
  • HDFS
  • Query-Specific Splits
  • Converting Queries to Updates
  • Conclusion

UPDATE (8 March 2013): I’ve added a few updates based on a couple more days of investigations in the run-up to discussing this at the Baltimore MongoDB meetup (link to slides from presentation), and also based on those meetup discussions.

Analyzing the default MongoDB-Hadoop connector

10gen publish a MongoDB-Hadoop connector on github. It is described here, but basically it spawns one mapper/split per user-specified number of chunks (already created from its sharding management). This is illustrated below.

The connector allows the submitter to specify a query that is applied by each mapper to the chunks it is assigned (also shown in the diagram below). This is very useful for a number of applications:

  • Providing a security layer.
  • Basic selection of the analysis dataset, eg by date/source.
  • More sophisticated slicing of the data, eg “all documents within X km of this location”, “referencing this person”, combinations thereof, etc.

Default Connector

The above figure shows clearly how the processing is distributed.

Data Locality

What about data locality? Well, the existing code makes no attempt to identify or take advantage of any overlap between Hadoop nodes and MongoDB nodes, but we’re more interested by what can be easily added, than by what is currently present.

It should be first noted that the MongoDB and Hadoop clusters are logically complely separate, each server can belong to either or both clusters as illustrated below.
Mongo Nodes
(Co-locating MongoDB and Hadoop nodes can often make sense since Hadoop jobs typically don’t use much memory and MongoDB is normally I/O bound not CPU bound, particularly on multi-core servers).

It is important to observe that whenever the data is not evenly distributed across the mapper nodes (and this will often be the case when a user query is specified), then there is a case to process data non-locally rather than wait for local resources to become available (unless the cluster is loaded with many jobs, making it likely that the non-local nodes would be used by other jobs anyway). However for clusters where there are more Hadoop than MongoDB nodes (eg due to API/application servers), then once the data-local nodes are full, splits can be allocated immediately to the “no data nodes”.

So, where desired, adding data locality is straightforward: the “mongos” in the first figure knows about all the shards, and the splitter knows about all the mapper nodes, so ensuring a user-configurable number of splits are assigned to co-located mapper nodes first will be straightforward. Replica sets can be taken into account in a similar way.

Run Times Based on Data Quantity

Now let’s look at how the existing code performs when the query returns only a subset of the collection.The table below shows how the run times are affected by reducing the amount of data processed, using a simple indexed query (5 cores across 3 lightly used servers with plenty of available I/O):

Method
Query select %
Records processed
Splits created
CPU time spent in mappers
Total job time
Default 81% 1.3M 228 36 minutes 1hr 5 minutes
Default 43% 700K 228 21 minutes 1hr 36 minutes
Default 4% 71K 228 2 minutes 30 seconds 57 minutes

UPDATE (8 March 2013): Many of the performance problems described in this post, starting with the large “total job times”, were due to the databases needing to be compacted. After compaction the same jobs described above took ~10 minutes on ext3 and ~5 minutes on ext4. The same scaling characteristics were observed however – adding a query resulted in no time gain.

Although the CPU time spent in mappers scales well, the overall time does not change much, because the same number of splits are being generated each time. This is illustrated by the figure below.

Default Connector

The red bars in the chunks indicate blocks of documents that match the query (in practice it will be even more distributed), and the white mappers indicate ones that have been passed no data (in practice even many of the mappers that are passed data are very lightly utilized, indicated in the diagram by a lighter shade of blue). The overhead from all those empty/mostly empty splits prevents the overall time from scaling.

Our testing also uncovered another important issue: although our query was indexed, it did not include the collection’s shard key (which happens to be “_id” for us). As a result, although an initial query from the “InputFormat” to the “mongos” would be fast, the mapper components combine the original query with the shard key (“_id”) range query. As a result each of the mappers will either perform the original query on the entire “_id” range, or the range query on the entire dataset matching the original query. This is illustrated in the diagram below, and discussed more later on in the post.

Default Connector

A simple improvement for smaller queries

For our the application we were building, each individual map/reduce job acts on a small number of documents, approximately 50K on average. We investigated the performance of “skip()”  (which presumably just steps through the “_id” index on the server, ie is very fast but scales “badly” -linearly- with the skip offset). Running MongoDB under a moderate load on an Amazon EC2 m1.xlarge instance with a 4x RAID-0 array, we got the following results:

time mongo doc_metadata --eval 'db.metadata.find({},{"_id":1}).skip(SKIP).limit(1).forEach(function(x){printjson(x)});'
  • skip(10000): 0.135s
  • skip(100000): 3.55s
  • skip(500000): 12.6s
  • skip(1000000): 25.7s

Obviously these numbers will vary from instance to instance (eg under less load, during my original experiments 6 months ago, the 500K skip was only 2s), but the “shape” will stay the same.

Note that although “skip()” is linear, the overall performance will in fact be O(N squared), because the number of mappers will increase linearly with the number of documents, and each mapper will have linear performance because of the “skip()” function.

Based on this we developed a very simple extension to handle tasks within our application’s typical performance envelope:

  • The Hadoop JAR specifies the maximum number of documents per split, and the maximum number of mappers (splits) to generate (eg we used 8 mappers and 12,500 documents per mapper).
  • The InputFormat component creates the minimum number of mappers based on doing a count with the original query, giving each mapper a skip parameter.
    • (If the results of the query would generate too many mappers then the default connector code is used instead)
  • Each mapper then performs the original query with the specified skip/limit

This is illustrated below, and worked very well. Example results, compared to the default method:

Method
Query of % DB
Records processed
Splits created
CPU time spent in mappers
Total job time
Skip/limit 5% 77K 7 2 minutes 26 seconds 43 seconds
Default 4% 71K 228 2 minutes 30 seconds 57 minutes

Our extensions to MongoInputFormat are released as open source under the Apache License and are available here. The configuration requirements are described in a post here.

Default Connector

As can be seen in both the diagram and the table, fewer mappers are generated and they are all fully utilized. Note one other difference compared to default connector, the mappers have to go through mongos components because chunk/shard boundaries are no longer respected. This is assumed to degrade performance but not by much. (2 “mongos” are shown because we use a fairly standard Hadoop cluster configuration of 2 mappers per dual-core CPU). One easy improvement (that has not yet been necessary for us) would be to treat each shard individually, with each mapper getting assigned a shard as well as a skip/limit pair.

Currently this approach is also slightly more sensitive to “bad” queries: with the default approach, because of the “_id” range, a bad query traverses the database at worst once (leaving aside swap issues from the chunks not being traversed in order necessarily). With this approach a bad query is made once per split. To productionize this code vs arbitrary queries, some query analysis logic (eg with “explain()“) should be used (again, backing out to the default where advisable).

A more general solution

Although this extension solved our short term needs, there are clearly some issues with this simple approach: eg because of its use of the linear “skip()” function, resulting in “O(N squared)” performance, it does not scale well. Even with the multi-shard improvement mentioned above, there will be a significant envelope where the query filters enough documents for the default connector to be ineffective, but where enough documents are returned by the filter for the extension to perform badly.

  • Eg if each shard contains 20M documents, then any query that returns <5M documents per shard but >250K is likely to fall in between the 2 existing methods. The actual envelope will of course vary from case to case.

What we would ideally like is an approach that fits in between the 2 existing connectors (and can support data locality). The remainder of this section describes what this might look like.

We briefly consider 3 alternatives:

  1. Dump matching documents to HDFS and then use Hadoop/HDFS as normal, ie not using any MongoDB specific connector at all.
  2. Use a compound of query key with the shard key, and create custom “query-specific” splits.
  3. Turn the query into an “update” on a sparsely indexed field, and then optimize the connector to use this field.

HDFS

Since the focus of this post is on manipulating indexes in MongoDB, we’re not going to spend much time on this, even though (disappointingly!) it might be the most promising type of approach.

It does have a few downsides – eg although it scales nicely, the amount of up-front I/O required for larger queries will be expensive. I did a quick test using “mongodump” with a large query (~40% of the DB), and it was at 0% after 6 minutes with 1.1GB written into one big file. This suggests at least that a custom export client would be needed. (“mongoexport” took 2.5 minutes to get to 1.1GB. In another experiment, restricting “mongoexport” to output only the “_id” field, it took 10 minutes to output 200K records.)

UPDATE (8 March 2013):  These issues were likely compaction-related also. On ext4 using “mongodump” on an idle database it took 2.5 minutes to export 1.5M records (~15GB). Of course in practice, MongoDB clusters are often IO-limited so adding another read+write cycle may not work well unless the HDFS and MongoDB partitions can use different physical channels.

One potential upside to not using HDFS is that the Java Security Manager is incompatible with Hadoop, and there are no plans to fix it; but it looks from the discussion like HDFS is the main impediment. Being able to enforce security restrictions in Hadoop JARs better than Hadoop does would be very powerful for multi-user platforms like Infinit.e that support plugins.

UPDATE (8 March 2013):  Another benefit of (at least partial use of) HDFS came up in the meetup discussion – the current MongoInputFormat is difficult to integrate with many 3rd party frameworks.

“Query-specific” splits

We saw that one big issue with the default connector was that it generated the splits independently of the query. The obvious way to solve this is to generate query-specific splits! The other issue was that the user query needed to be combined with the shard key index –  fortunately MongoDB supports exactly this sort of compound index. Even better, the compound index will solve most of both issues. Note that our large collections happen to sharded by “_id” so we’ll proceed on that assumption; the following logic will hold for any shard key that is “reasonably unique”. The case where the shard key has a small number of distinct values is outside the scope of this post.

One initial worry I had was that a compound index would take up significantly more space, but that actually doesn’t seem to be an issue. For example, considering a single shard with 10M documents each of ~20KB in size, with a query on an indexed field with 140K distinct values:

db.metadata.stats()
"indexSizes" : {
         "_id_" : 279463856, // (_id has 10M distinct values)
         //...
         "url_1_sourceKey_1" : 1281915040,
         "sourceKey_1" : 628292896, // (sourceKey has 140K distinct values)   
         "sourceKey_1__id_-1" : 867923280
 }

Compare the last 2, which are identical apart from the secondary “_id” term – they are not very different in size. In fact both on the replicas of this shard, and also on a completely different cluster, with a shard of ~3M documents and only ~500 unique “sourceKey” values, the compound index is in fact slightly smaller. (Presumably the overhead caused by “munging” the two keys together is somewhat offset by the fact there can be only 1 document per combined index). Note just looking at the sizes doesn’t necessarily give the whole picture when it comes to query efficiency. That sort of thing is difficult to quantify; however empirically I haven’t noticed any difference between the two.

Once the compound index is in place, the next step is for the InputFormat component to generate the mappers/splits. Assume the user specifies a maximum number of documents D per mapper instance, and that the query returns N documents. Then we want to generate “ceiling(N/D)” mappers, with each one specified by an “_id” range. This is illustrated by the following diagram.

Default Connector

The process of building query-specific “_id” ranges is actually surprisingly difficult because of a MongoDB issue described here and in this JIRA issue (but basically: you can’t sort on the secondary index, eg “_id”, if the query on the primary index, eg “field”, involves a range; this will normally be the case for these sorts of queries).

You can sort on the compound index though, so an approximate version can be calculated with a single pass through the data by encoding logic in a map-reduce (or in a an eval or forEach() block, it’s not clear what the fastest would be):

  • While the “field” index key remains the same, the “_id”s are ordered, so you can create “_id” ranges that contain (eg) 1/10th of the desired size.
  • Each time the “field” index changes, the next “_id” can potentially be out-of-order so you need to locate its position in the array of ranges. Then increment the number of documents in the corresponding “_id” range.
  • In the client, step once through the resulting array, merging _”id” ranges until they contain close to the desired number of documents.

I didn’t have time to implement this approach, so I knocked up a dummy algorithm that doesn’t do anything functionally, but should take a similar amount of time.

Dummy code for timing estimates
SECONDARY> var a = [ [ "dummy", 0 ] ];
SECONDARY> var ncnt = 0;
SECONDARY> var savedfield = null;
SECONDARY> db.metadata.find({sourceKey:/^www/},{sourceKey:1,_id:1}).hint({sourceKey:1,_id:-1}).sort({sourceKey:1,_id:-1}).
# forEach(function(x){ ncnt++; if (x.sourceKey != savedfield) { // (simulates re-finding index range by binary chop)
# savedfield = x.sourceKey; i = 1; for (;i < a.length/4; i*=2); a[i-1][1] = a[i-1][1]+1; } 
# else if (ncnt >= 1250) { a.push([x._id,ncnt]); ncnt = 0; }}) // keep track of clump sizes

The above “dummy-but-representative” code took 1 minute to run on a query returning 4.5M documents (out of 10M), and should scale approximately linearly (actually O(N*log(N)); of course there will be some hard cap on the number of documents per shard anyway, probably before the log(N) term becomes significant).

Once the above unpleasantness has been overcome, it is straightforward to spawn the required number of mappers and pass each one an “_id” range to go with the original query. This is illustrated below:

Default Connector

Here only the required number of mappers are generated, and each mapper makes an indexed query to its shard. Because each mapper is assigned an “_id” range that is within a single shard, data locality can be easily enforced if desired.

Converting the “queries” to “updates”

This approach is actually very similar to the “query-specific” splits described above. We saw two unpleasant issues with that approach:

  • The additional logic that was required to handle queries involving ranges on the primary indexed field.
  • The fact that every query would need to contain the type of compound index described above.

A candidate solution to this is to transform the user query into a single platform-level key (that is then the only thing that needs to have the compound index with the shard key). For example:

Transforming queries to updates
mongos> db.metadata.ensureIndex({mrSelect:1, _id:1}, {background:true, sparse:true})
mongos> db.metadata.update(USER_QUERY, {$push:{mrSelect:JOBID}}, false, true);

(Unfortunately the sparseness of the index does nothing because the index is compound, see this issue. At least it can plausibly be hoped that the “mrSelect==null” portion of the index should mostly stay swapped out and not be too expensive).

Then all the subsequent processing occurs just as in the previous example, except that the “user query” is replaced with “{ mrSelect: JOBID }” everywhere. Once the Hadoop job is finished, the array element can be removed.

One advantage of this approach might no longer be an issue with the new MongoDB full text query (which I haven’t had a chance to play with yet): it makes it very easy to integrate the processing with text scanning Lucene-based solutions: from within eg elasticsearch (our favored platform) scroll through the matching “_id”s in blocks, for each one call “update” with “{ ‘$in': ID_ARRAY }” as the query, and then proceed as before.

Unfortunately on a fairly heavily loaded database shard, the above update operation (on the query from the previous section, selecting 4.4M out of 10M documents) had still not completed after 20 minutes – this suggests at they very least that it is not a robust solution. In fact I then turned everything using that database off, and it still had not completed after 10 minutes. The IO (via “iostat”) showed as almost exclusively reads, “mongostat” showed no activity, and “top” showed lots of IOWAIT (100% of 1 core) but a small amount of CPU activity (<10%) shared between “mongod” and “kswapd” – so it’s not very clear what was happening.

(My main worry had been that adding ObjectIds to the document object would require that they all get relocated on the file, my hope was that there was enough padding to prevent that from being necessary for most documents).

One interesting hybrid idea would be to create files (eg on HDFS) compactly containing the “_id”s generated from the query, and then have the mappers atomically read/delete them in batches of (say) 1000 and perform “{ ‘_id': { ‘$in': [ ID_ARRAY  ] } }” queries until there are no more left. This would remove the overhead of using the database for a relatively simple data exchange. And this could also decouple the “query scrolling” from the mappers (by having a separate server do the scrolling), reducing latency for large query responses.

UPDATE (8 March 2013):  I later confirmed that there were 2 issues with the test:

  • the documents’ padding was insufficient therefore each update was causing the entire (10KB) object to be moved.
  • the database needed to be compacted, so IO was poor in general.

I didn’t rerun any timing tests, partly because I’ve cooled a little on this idea in the meantime (some related “hybrid” ideas are more promising).

Conclusions

This post performed some preliminary analysis on the MongoDB-Hadoop connector, described how we solved one performance issue, and speculated on related performance issues and approaches to solve them.

A very quick summary looks something like:

Method
Development requirements
Performance
Issues/Further analysis
Default Complete Good if no query specified, decent if query is most of the database, terrible if the query is small, unknown in between Improvements possible to support
better data-locality and replicas
Skip/limit Complete – improvements possible to
support better data-locality and replicas
Good for smaller queries (eg <100K matching documents), scales badly as the number of matching documents increases Improvements possible to support
better data-locality and replicas
Query-specific indexes Medium-hard Performance was OK on ~5M matching docs per shard, should scale well above that.Will perform similarly to “Default” unless there is an indexed field in they query, and that index
must have the shard key as a secondary term
Need to generate compound indexes for every query fieldRange-building algorithm not provenOnly works if shard key is “_id”-like
Query transform Medium-low (if feasible) Did not appear to work at all on a ~5M matching document query Not clear why it wasn’t working
Write to HDFS Medium Unknown, initial experiments suggested mongoexport/mongodump with query were slow. Not clear why performance was so bad

There is clear evidence that the default approach can be improved upon when the data to be processed is sub-selected by a query. There is some investigation to be done into some of the unexpected results encountered during the experiments performed for this blog post, but regardless the “query-specific indexes” approach seems like a promising avenue for further exploration.

It will likely be the case that different approaches are optimal for different algorithms, and a recommendation for 10gen, if they wanted to optimize the MongoDB-Hadoop experience, would be to put in place a better framework in place to help the community discover others’ extensions and/or publish their own.

There was also a description of how any of the above approaches could better support replicas and shards. This is not very complex to add to any of approaches discussed.

Finally, note that many intermediate results and tangents did not make their way into this post, in an attempt to keep it focused and somewhat readable. Do contact us if you are interested in more details or discussion. I intend to present this at the February Baltimore MongoDB meetup, so come along to that if you’re in the neighborhood!

About the Author:

Alex Piggott is IKANOW’s Director of Product Development. Alex runs IKANOW’s research and development activities and leads the development team. He has 15 years of experience designing and developing complex real-time mission-critical distributed systems. In addition, Alex graduated from Oxford University with a first-class degree in Mathematics.

This post is also available as a free whitepaper:









No comments

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>