(Bringing an offline discussion to the email list. More context in the thread below)
We do plan to increase the types of events that get sent to tasks. Currently we only send a taskcompletionevent to the consumer task that informs it about the input. Sending more events would need plumbing changes in the TaskUmbilical and probably some refactoring of the event hierarchy. Before we do that we need to think through to some degree, when and what events make sense so that we can make a properly designed improvement to the AM code. Bikas ============ Hmm, interesting. I see that there is no need to tell mappers currently about the runtime parallelism stuff because you just inform the reducers of what ranges to fetch. Would you be opposed to having events being sent to map tasks? Achal On Tue, Aug 20, 2013 at 11:25 AM, Bikas Saha No events go to the Map task before it completes. The only events that go to the tasks are completion events that tell them about completion of their inputs. So currently there is no way one can tell the mapper while its running to do something else. Bikas *From:* Achal Soni *Sent:* Tuesday, August 20, 2013 11:18 AM *To:* Bikas Saha *Subject:* Re: Shuffle Yes this sounds like a good solution, and similar to what I am thinking of as well. In Pig, whenever range partitioning is concerned, a sampling job is performed that reads through all the inputs. There is special logic when it comes to sampling, for example for a skewed join, it does special sampling for certain skewed keys. That is where Tez would come in by basically generating histograms and then having this user code figure out the ranges according to whatever metrics it wants to. We can provide a default implementation that just determines ranges based on the amount of data the keys are generating perhaps. (Pig may do something by extrapolating the amount of work certain keys will cause in the next MR job). I believe that in MR, the reducer goes to each map task via HTTP, and requests for it's partition. The mapper has a single, consolidated spill file with all of its output data, and has an index on this spill file with the starting offset and length of each partition (one per reducer in traditional MR, but in your runtime parallelism scheme, may be more than one partition) in this spill file, and serves the data likeso. I am thinking of building upon your code for the runtime parallelism to basically send the histogram from each mapper via event updates and then consolidating that similar to "determineAndApplyParallelism". Then the mappers would receive event updates with the new ranges so when it creates the index for each partition, it can update it appropriately. Then when the reducers come to the mapper in the copy phase, we don't need to touch the ShuffleHandler code which handles the HTTP connection, as the partition indices would already be updated. I am myself just learning the code base right now, so if you find any inconsistencies/errors do let me know! - Achal On Tue, Aug 20, 2013 at 10:17 AM, Bikas Saha wrote: There is a well-known solution (or at least I know of it from an earlier life) to dynamic range partitioning with different numbers of partitions. Basically the “partitioner” just leaves a sorted output and a sample. Using the sample, we can determine the right number of reducers and their ranges. The only problem is how to serve the right section of the sorted file to the relevant reducer. Eg. one only want to give reducer R the portion of the sorted file from values F-K. The file server could seek the file to F and then serve up to K. In order to seek quickly to F it needs to build an index when writing the file. The index can get very large is its too fine-grained. So we keep and index proportional to the expected number of reducers. Then when the reducers are for their data then the file server gives it a section of file that “contains” the data of interest which may be a little more than needed. The reducer can filter on its side. Or the server can filter on its side and send the exact range. Whichever suits the implementation. I believe what you are suggesting below is around these lines. Bikas -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
