[ 
https://issues.apache.org/jira/browse/MAPREDUCE-5494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13761624#comment-13761624
 ] 

Lijie Xu commented on MAPREDUCE-5494:
-------------------------------------

Cool, Hadoop's sort-based and disk-based MapReduce framework should be 
dramatically improved to better job's performance.

As for this hash-based MapReduce, Apache Spark uses a similar hash-based 
GroupBy-Aggregate approach. Compared with Spark's implementation, this proposal 
may have some problems. 

1) At map side, the only problem is that Combine() cannot be done easily. For 
jobs with Combine(), Spark uses a hashmap to collect the <K,V> pairs emitted by 
map() directly without a buffer. Combine() can be done when the <K,V> pairs are 
put into the hashmap one by one. But it requires that the semantic meaning of 
this Combine() is similar with fold() in functional language. After map(), 
hashmap outputs all the <K,V> into different buffers (one for each reducer) 
according to each pair's partition id. Then, the <K,V> are reserved in the 
buffers or spilled onto disk for further shuffling. The defect of Spark's 
approach is that hashmap is memory-consuming. So it is a trade-off between 
performance and fault-tolerance. For jobs without Combine(), Spark uses your 
proposed method. 

2) At reduce side, firstly, I want to argue the sort-based MapReduce in current 
Hadoop versions. Why should we shuffle all the map outputs into a reducer 
before doing reduce()? Since all the <K,V> pairs in remote map outputs are 
sorted, reduce() can perform as same as Merge() on the fly. Maybe the reason is 
that it is not easy to iterating <K,V> pairs when map outputs are compressed.
   Secondly, return to the proposed approach. "the hash shuffle will fetch its 
own partitions from maps as usually" is not very clear. Maybe you want to do 
shuffle in this way: Firstly, copying remote map outputs and store the shuffled 
segments into memory as usual. When a threshold is achieved, you iterate every 
record in every in-memory segment. During iterating, you choose a pretty good 
hash function to partition each record and store it onto related disk files. 
After all the records are stored onto disk, reduce() begins to load the 
partition file into hashmap. After all the records are cached into hashmap, 
Reduce() begins to perform on each <K, UnsortedList<V>>. The secondary level 
partition guarantees the hashmap can be kept in memory.
   I think this approach is applicable. However, how to choose the secondary 
partition function is tricky and Combine() is discarded too. Another 
performance issue is that this memory-to-disk and disk-to-memory back-and-forth 
copy is not efficient. Spark uses a large hashmap to do "shuffle and combine()" 
simultaneously as descried in mapper's combine(), but it requires memory is 
enough to hold the hashmap. We can enlarge the reduce number to lower down this 
requirement.

Above all, if we only change sort-based to hash-based, the job's performance 
may not be promoted. A better way is to combine the hash-based and memory-based 
approaches together as Spark does. We can use memory aggressively (e.g., use 
large hashmap instead of back-and-forth copy). If there is a memory error, we 
can change the job's running style back into sort-based. 




                
> Hash-based MapReduce
> --------------------
>
>                 Key: MAPREDUCE-5494
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-5494
>             Project: Hadoop Map/Reduce
>          Issue Type: New Feature
>          Components: mrv2
>    Affects Versions: trunk
>            Reporter: Jerry Chen
>            Assignee: Jerry Chen
>   Original Estimate: 1,344h
>  Remaining Estimate: 1,344h
>
> To support parallel processing, the MapReduce computation model essentially 
> implements group data by key, then apply the reduce function to each group. 
> The currently implementation of MapReduce framework uses sort-merge to 
> guarantee the computation model. While “sort-merge” is relatively expensive 
> when only grouping is needed. And this is what hash capable to do.
>  
> We propose to implement Hash-based MapReduce which utilizes hash to guarantee 
> the computation model instead of the sort merge. This technique will works as 
> following:
>  
> 1. At map side, the hash map output collector will collect the map output 
> directly to the corresponding partition buffer (for each reduces) without 
> sorting (first level partition). Each partition buffer can be flushed to disk 
> if the buffer is full or close to full. To handling disk IO efficiently when 
> there are too many partitions (reduces), the map side can be optimized by 
> using a shared buffer for different partitions. Counting sort on partition 
> number can be performed when flushing the shared buffer. 
> 2. At reduce side, the hash shuffle will fetch its own partitions from maps 
> as usually. While fetching, the records will be further partitioned 
> (secondary level partition) by a universal hash function. By properly 
> choosing the number of the partitions, every single partition should be able 
> to fit into the memory. For cases such as much skewed distribution of the 
> keys, the size of a partition may be too large to fit into the memory. When 
> this happens, a parameter can be used to control whether we simply choose to 
> fail the job or to try further partition the large partition into smaller 
> ones using another hash function. 
> 3. Once all the data are fetched and partitioned at reduce side, it starts 
> iterating. A RawKeyValueIterator will be wrapped to process and iterating the 
> partitions one by one. The processing for each partition is to load the 
> partition into memory and a hash table can be built. And an iterator will be 
> wrapped on the hash table to feed reduce the groups of keys and values in the 
> hash table. 
> Although there are some JIRAs related in using hash in MapReduce, the process 
> proposed here has some fundamental differences with them. MAPREDUCE-1639 
> (Grouping using hashing instead of sorting) is described to be replacement of 
> map side sort only. MAPREDUCE-3247 (Add hash aggregation style data flow 
> and/or new API) and MAPREDUCE-4039 (Sort Avoidance) are mostly focused on no 
> sort map reduce and not trying to guarantee the computation model at the 
> framework level. From the above process, this work is a complete hash based 
> approach. Sort at map side and merge at reduce side are completely replaced 
> by hash and guarantee the computation model of MapReduce. 
>  
> While one potential affect to use hash without sorting is that MapReduce 
> users should not depends on the order of different keys. The order of the 
> keys are implied by the sort-merge process but will no longer implied when 
> using hash for grouping keys. 
>  
> This work is implemented based on the pluggable MapOutputCollector (Map side) 
> and ShuffleConsumerPlugin (Reduce side) provided by MAPREDUCE-2454. There are 
> no modifications to the existing MapReduce code and so keep the affect to the 
> original implementation to minimum. The hash-based MapReduce is not used by 
> default. To enable Hash-based MapReduce, set 
> “mapreduce.job.map.output.collector.class” to HashMapOutputCollector class 
> and “mapreduce.job.reduce.shuffle.consumer.plugin.class” to HashShuffle class.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to