[ http://issues.apache.org/jira/browse/HADOOP-331?page=all ]
Devaraj Das updated HADOOP-331: ------------------------------- Attachment: 331-initial3.patch This is the first version of the patch for review. While testing is in progress, I thought the review can go on in parallel. The main changes are: 1) The reduces don't do sort anymore. One change in ReduceTask.java is to do with that - "sort" string is replaced by "merge" in a couple of places. 2) The other change in ReduceTask.java is that the class ValuesIterator class has been upgraded to package private static class. This is because, I am using the functionality of this class in Combiner also. Thus, I have two new classes extending from ValuesIterator: ReduceValuesIterator and CombineValuesIterator defined in ReduceTask.java and MapTask.java respectively. 3) In MapTask.java, I have a couple of other new classes: 3.1 MapOutputBuffer: this class does all the work to do with maintaining the buffer, sort, combine and spill. It implements the OutputCollector interface and a new interface called OutputWriter. The intent of having the second interface is for other classes to invoke the methods for writing the partition boundaries (syncs) and doing the final merge of the spills. 4) The sort has been refactored as an interface SorterBase.java (that extends from RawKeyValueIterator interface). It defines a couple of methods that the MapOutputBuffer invokes during the map phase. The reason for extending from RawKeyValueIterator is to allow easy iteration over the sorted data (during combining/spilling). Also, during iteration the OutputWriter in MapTask.java is notified of partition changes so that it can do things like ending a block boundary by writing a sync (for block compression), etc. 5) Implemented the SorterBase interface as a class BasicTypeSorterBase that implements the methods and the relevant datastructures as arrays of primitive int (as opposed to object arrays which could be implemented by another class implementing the SorterBase interface). The intent of BasicTypeSorterBase class is to serve all implementations of sort algos that rely on primitive arrays (of offsets/lengths/etc. to a read-only buffer). The sort() method itself is empty here. 6) MergeSorter.java extends from BasicTypeSorterBase.java and implements the sort method. Basically, it accesses the data structures that BasicTypeSorterBase created and sets up the input arguments for hadoop.util.MergeSort (that actually implements the core of the MergeSort algo). One could have a QuickSorter.java and a corresponding hadoop.util.QuickSort.java and so on. The bridge between the framework and the sort algo is the Comparator that is passed by the framework to the hadoop.util.MergeSort's constructor. 7) What sort algo to use in the framework can be set by map.sort.class which defaults to MergeSorter.class (for now). 8) TaskTracker.java has the necessary jsp changes to serve the map output data to the reduces. 9) SequenceFile.java has two new public APIs: createWriter and sync (this createWriter has slightly different args). Also refactored sort to use the new mergesort from hadoop.util. 10) A new class hadoop.util.ListOfArrays has been introduced to maintain lists of primitive arrays and handle things like growing of arrays internally. > map outputs should be written to a single output file with an index > ------------------------------------------------------------------- > > Key: HADOOP-331 > URL: http://issues.apache.org/jira/browse/HADOOP-331 > Project: Hadoop > Issue Type: Improvement > Components: mapred > Affects Versions: 0.3.2 > Reporter: eric baldeschwieler > Assigned To: Devaraj Das > Attachments: 331-design.txt, 331-initial3.patch, 331.txt > > > The current strategy of writing a file per target map is consuming a lot of > unused buffer space (causing out of memory crashes) and puts a lot of burden > on the FS (many opens, inodes used, etc). > I propose that we write a single file containing all output and also write an > index file IDing which byte range in the file goes to each reduce. This will > remove the issue of buffer waste, address scaling issues with number of open > files and generally set us up better for scaling. It will also have > advantages with very small inputs, since the buffer cache will reduce the > number of seeks needed and the data serving node can open a single file and > just keep it open rather than needing to do directory and open ops on every > request. > The only issue I see is that in cases where the task output is substantiallyu > larger than its input, we may need to spill multiple times. In this case, we > can do a merge after all spills are complete (or during the final spill). -- This message is automatically generated by JIRA. - If you think it was sent incorrectly contact one of the administrators: http://issues.apache.org/jira/secure/Administrators.jspa - For more information on JIRA, see: http://www.atlassian.com/software/jira