Improve the performance of the Merge phase
------------------------------------------

                 Key: HADOOP-830
                 URL: http://issues.apache.org/jira/browse/HADOOP-830
             Project: Hadoop
          Issue Type: Improvement
          Components: mapred
            Reporter: Devaraj Das
         Assigned To: Devaraj Das


This issue is about trying to improve the performance of the merge phase 
(especially on the reduces). Currently, all the map outputs are copied to disk 
and then the merge phase starts (just to make a note - sorting happens on the 
maps).

The first optimization that I plan to implement is to do in-memory merging of 
the map outputs. There are two buffers maintained - 
1) a scratch buffer for writing map outputs (directly off the socket). This is 
a first-come-first-serve buffer (as opposed to strategies like best fit). The 
map output copier copies the map output off the socket and puts it here 
(assuming there is sufficient space in the buffer).
2) a merge buffer - when the scratch buffer cannot accomodate any more map 
output, the roles of the buffers are switched - that is, the scratch buffer 
becomes the merge buffer and the merge buffer becomes the scratch buffer. We 
avoid copying by doing this switch of roles. The copier threads can continue 
writing data from the socket buffer to the current scratch buffer (access to 
the scratch buffer is synchronized). 

Both the above buffers are of equal sizes configured to have default values of 
100M.

Before switching roles, a check is done to see whether the merge buffer is in 
use (merge algorithm is working on the data there). We wait till the merge 
buffer is free. The hope is that while merging we are reading key/value data 
from an in-memory buffer and it will be really fast and so we won't see client 
timeouts on the server serving the map output. However, if they really timeout, 
the client sees an exception, and resubmits the request to the server.

With the above we are doing copying/merging in parallel.

The merge happens and then a spill to disk happens. At the end of the in-memory 
merge of all the map outputs, we will end up with ~100M files on disk that we 
will need to merge. Also, the in-memory merge gets triggered when the in-memory 
scratch buffer has been idle too long (like 30 secs), or, the number of outputs 
copied so far is equal to the number of maps in the job, whichever is earlier. 
We can proceed with the regular merge for these on-disk-files and maybe we can 
do some optimizations there too (haven't put much thought there).

If the map output can never be copied to the buffer (because the map output is 
let's say 200M), then that is directly spilled to disk.

To implement the above, I am planning to extend the FileSystem class to provide 
an InMemoryFileSystem class that will ease the integration of the in-memory 
scratch/merge with the existing APIs (like SequenceFile, MapOutputCopier) since 
all them work with the abstractions of FileSystem and Input/Output streams.

Comments?

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: 
http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to