I was going to post this to my blog but I'm running into technical difficulties at the moment (don't ask) so figured I'd just post it here and see if anyone any feedback.
.... I recently wrote an implementation of an algorithm in in Pig which exposed some bugs / design flaws in the Hadoop core which are addressed in other Map Reduce implementations. It exposed some bugs in the Pig optimizer but at the core these were problems with Hadoop. Specifically, if you have run one previous map reduce job, and the data is all nicely partitioned across the cluster, your next map reduce job can not do an efficient merge with the previous data set. In the case of Pig it takes the previous output and does a full rejoin, sorting the data again, shuffling the data to other notes, and re-grouping the data. Twister <http://www.iterativemapreduce.org/>, and Map Reduce Merge<http://www.systems.ethz.ch/education/past-courses/hs08/map-reduce/reading/mapreducemerge-sigmod07.pdf> were designed to address these problems. Twister was specifically designed for iterative applications. For example, if we consider K-means clustering algorithm[16], during the nth iteration the program uses the input data set and the cluster centers computed during the (n-1)th iteration to compute the next set of cluster centers. To support map/reduce tasks operating with these two types of data products we introduced a "configure" phase for map and reduce tasks, which can be used to load (read) any static data at the map and reduce tasks. Twister is interesting in that it's a 'micro-mapreduce' implementation (which is meant to be a complement) and is only 11k lines of code. Basically it consists of a controller which you use to partition your input to N machines. The N machines then act as static partitions so that when you do your next Map Reduce round, the data is already partitioned next to you on disk. This means that if you have data with little skew, it will all be nicely balanced out across the cluster and you can do a local map-side merge join with the data that is sitting on disk. With Hadoop and HDFS you can't do this. The data from your stored reducer is written to the local HDFS node which counts as one copy but then the extra blocks just are randomly given to other nodes. Since the storage of the partitions isn't deterministic, there is no way to route the secondary map reduce jobs so that the reducers and the data from the previous map reduce partition are on the same machine. This basically means that you only have a nr_replicas / N probability that you will be doing a local read. If your nr_replicas is 2-3 and N is like 1000 machines, the probability that you're doing a local read is very low. The Twister paper notes that their failure mode isn't very sophisticated. Basically they have a controller and you just run back one iteration of the MR job: Our approach is to save the application state of the computation between iterations so that in the case of a failure the entire computation can be rolled back few iterations. Supporting individual map or reduce failures require adopting an architecture similar to Google, which will eliminate most of the efficiencies that we have gained using Twister for iterative MapReduce computations. This seems like a non-starter for many applications. If your computation requires hundreds or thousands of nodes you're going to rollback often. The rollback probability is MTBF of a node / N so the more nodes you have the more often your Twister jobs will be crashing and needing to restart iterations. I don't think this design is impossible to fix though. It seems that sharding HDFS could be the solution here. Basically, instead of using random block balancing, you just have block replication paired to only 2-3 nodes. This has its drawbacks. With the random approach if you have a node fail, you can read blocks from N-1 nodes. This means that each source node only has to serve 1 HDFS block chunk so you never saturate source ethernet ports or cause too high of a load on a single box. However, with a shared approach, your HDFS reads are only going to come from two boxes. Your other two machines in the same shard. However, with temporary data you might just be able to avoid doing the block replication as it might not matter. If the data will eventually be discarded in the next iteration being down to 2 replicas might not be the end of the world. Another issue is how to do your reduce jobs if these box form one partition. Should the entire partition be on reduces job or should you run three reduce jobs in a shard. It might be possible to to just run three reduce jobs but replicate the data to all nodes. Then when the next iteration's mappers run you can read from three partitions and do the join and final reduction then. I think the trade off would be worth it as it might be a slight performance hit to do it this way but a MAJOR hit to only use 1/3rd of your hardware for reducers. <b>Map Reduce Merge</b> The Map Reduce Merge paper goes a step beyond this and argues that we need to stop thinking in Map Reduce and instead start to think in Map-Reduce-Merge: We improve Map-Reduce into a new model called Map- Reduce-Merge. It adds to Map-Reduce a Merge phase that can efficiently merge data already partitioned and sorted (or hashed) by map and reduce modules. We also demonstrate that this new model can express relational algebra operators as well as implement several join algorithms. ... In this new model, the map function transforms an input key/value pair (k1,v1) into a list of intermediate key/value pairs [(k2 , v2 )]. The reduce function aggregates the list of values [v2] associated with k2 and produces a list of values [v3], which is also associated with k2. Note that inputs and outputs of both functions belong to the same lineage, say α. Another pair of map and reduce functions produce the intermediate output (k3,[v4]) from another lineage, say β. Based on keys k2 and k3, the merge function combines the two reduced outputs from different lineages into a list of key/value outputs [(k4,v5)]. This final output becomes a new lineage, say γ. If α = β, then this merge function does a self-merge, similar to self-join in relational algebra. Notice that the map and reduce signatures in the new model are almost the same as those in the original Map- Reduce. The only differences are the lineages of the datasets and the production of a key/value list from reduce instead of just values. These changes are introduced because the merge function needs input datasets organized (partitioned, then either sorted or hashed) by keys and these keys have to be passed into the function to be merged. In Google's Map- Reduce, the reduced output is final, so users pack whatever needed in [v3], while passing k2 for next stage is not re- quired. I think this might be a better way to look at the problem. Hadoop and Pig are both really good at bulk processing single iterations of data but any complicated merges require more work. Forcing this concept into the platform means that it's now a first class citizen and isn't overlooked in the design. -- Founder/CEO Spinn3r.com Location: *San Francisco, CA* Skype: *burtonator* Skype-in: *(415) 871-0687*