Improve the performance of the Merge phase
------------------------------------------
Key: HADOOP-830
URL: http://issues.apache.org/jira/browse/HADOOP-830
Project: Hadoop
Issue Type: Improvement
Components: mapred
Reporter: Devaraj Das
Assigned To: Devaraj Das
This issue is about trying to improve the performance of the merge phase
(especially on the reduces). Currently, all the map outputs are copied to disk
and then the merge phase starts (just to make a note - sorting happens on the
maps).
The first optimization that I plan to implement is to do in-memory merging of
the map outputs. There are two buffers maintained -
1) a scratch buffer for writing map outputs (directly off the socket). This is
a first-come-first-serve buffer (as opposed to strategies like best fit). The
map output copier copies the map output off the socket and puts it here
(assuming there is sufficient space in the buffer).
2) a merge buffer - when the scratch buffer cannot accomodate any more map
output, the roles of the buffers are switched - that is, the scratch buffer
becomes the merge buffer and the merge buffer becomes the scratch buffer. We
avoid copying by doing this switch of roles. The copier threads can continue
writing data from the socket buffer to the current scratch buffer (access to
the scratch buffer is synchronized).
Both the above buffers are of equal sizes configured to have default values of
100M.
Before switching roles, a check is done to see whether the merge buffer is in
use (merge algorithm is working on the data there). We wait till the merge
buffer is free. The hope is that while merging we are reading key/value data
from an in-memory buffer and it will be really fast and so we won't see client
timeouts on the server serving the map output. However, if they really timeout,
the client sees an exception, and resubmits the request to the server.
With the above we are doing copying/merging in parallel.
The merge happens and then a spill to disk happens. At the end of the in-memory
merge of all the map outputs, we will end up with ~100M files on disk that we
will need to merge. Also, the in-memory merge gets triggered when the in-memory
scratch buffer has been idle too long (like 30 secs), or, the number of outputs
copied so far is equal to the number of maps in the job, whichever is earlier.
We can proceed with the regular merge for these on-disk-files and maybe we can
do some optimizations there too (haven't put much thought there).
If the map output can never be copied to the buffer (because the map output is
let's say 200M), then that is directly spilled to disk.
To implement the above, I am planning to extend the FileSystem class to provide
an InMemoryFileSystem class that will ease the integration of the in-memory
scratch/merge with the existing APIs (like SequenceFile, MapOutputCopier) since
all them work with the abstractions of FileSystem and Input/Output streams.
Comments?
--
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators:
http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira