[ http://issues.apache.org/jira/browse/HADOOP-331?page=all ]

Devaraj Das updated HADOOP-331:
-------------------------------

    Attachment: 331-initial3.patch

This is the first version of the patch for review. While testing is in 
progress, I thought the review can go on in parallel. The main changes are:
1) The reduces don't do sort anymore. One change in ReduceTask.java is to do 
with that - "sort" string is replaced by "merge" in a couple of places.
2) The other change in ReduceTask.java is that the class ValuesIterator class 
has been upgraded to package private static class. This is because, I am using 
the functionality of this class in Combiner also. Thus, I have two new classes 
extending from ValuesIterator: ReduceValuesIterator and CombineValuesIterator 
defined in ReduceTask.java and MapTask.java respectively.
3) In MapTask.java, I have a couple of other new classes:
   3.1 MapOutputBuffer: this class does all the work to do with maintaining the 
buffer, sort, combine and spill. It implements the OutputCollector interface 
and a new interface called OutputWriter. The intent of having the second 
interface is for other classes to invoke the methods for writing the partition 
boundaries (syncs) and doing the final merge of the spills.
4) The sort has been refactored as an interface SorterBase.java (that extends 
from RawKeyValueIterator interface). It defines a couple of methods that the 
MapOutputBuffer invokes during the map phase. The reason for extending from 
RawKeyValueIterator is to allow easy iteration over the sorted data (during 
combining/spilling). Also, during iteration the OutputWriter in MapTask.java is 
notified of partition changes so that it can do things like ending a block 
boundary by writing a sync (for block compression), etc.
5) Implemented the SorterBase interface as a class BasicTypeSorterBase that 
implements the methods and the relevant datastructures as arrays of primitive 
int (as opposed to object arrays which could be implemented by another class 
implementing the SorterBase interface). The intent of BasicTypeSorterBase class 
is to serve all implementations of sort algos that rely on primitive arrays (of 
offsets/lengths/etc. to a read-only buffer). The sort() method itself is empty 
here.
6) MergeSorter.java extends from BasicTypeSorterBase.java and implements the 
sort method. Basically, it accesses the data structures that 
BasicTypeSorterBase created and sets up the input arguments for 
hadoop.util.MergeSort (that actually implements the core of the MergeSort 
algo). One could have a QuickSorter.java and a corresponding 
hadoop.util.QuickSort.java and so on. The bridge between the framework and the 
sort algo is the Comparator that is passed by the framework to the 
hadoop.util.MergeSort's constructor.
7) What sort algo to use in the framework can be set by map.sort.class which 
defaults to MergeSorter.class (for now).
8) TaskTracker.java has the necessary jsp changes to serve the map output data 
to the reduces.
9) SequenceFile.java has two new public APIs: createWriter and sync (this 
createWriter has slightly different args). Also refactored sort to use the new 
mergesort from hadoop.util.
10) A new class hadoop.util.ListOfArrays has been introduced to maintain lists 
of primitive arrays and handle things like growing of arrays internally.

> map outputs should be written to a single output file with an index
> -------------------------------------------------------------------
>
>                 Key: HADOOP-331
>                 URL: http://issues.apache.org/jira/browse/HADOOP-331
>             Project: Hadoop
>          Issue Type: Improvement
>          Components: mapred
>    Affects Versions: 0.3.2
>            Reporter: eric baldeschwieler
>         Assigned To: Devaraj Das
>         Attachments: 331-design.txt, 331-initial3.patch, 331.txt
>
>
> The current strategy of writing a file per target map is consuming a lot of 
> unused buffer space (causing out of memory crashes) and puts a lot of burden 
> on the FS (many opens, inodes used, etc).  
> I propose that we write a single file containing all output and also write an 
> index file IDing which byte range in the file goes to each reduce.  This will 
> remove the issue of buffer waste, address scaling issues with number of open 
> files and generally set us up better for scaling.  It will also have 
> advantages with very small inputs, since the buffer cache will reduce the 
> number of seeks needed and the data serving node can open a single file and 
> just keep it open rather than needing to do directory and open ops on every 
> request.
> The only issue I see is that in cases where the task output is substantiallyu 
> larger than its input, we may need to spill multiple times.  In this case, we 
> can do a merge after all spills are complete (or during the final spill).

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: 
http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to