[ https://issues.apache.org/jira/browse/TEZ-1094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Rajesh Balamohan updated TEZ-1094: ---------------------------------- Attachment: TEZ-1094.3.patch Removed "finalMergeEnabled=false" case in UnorderedPartitionedKVWriter. It isn't adding much value in this case. We can add it later on need basis. writeLargeRecord - needs to send out an event in case of pipelined shuffle. - Fixed. Added test to cover this. Empty partition details for each event are being generated from the global stats - which means they're cumulative. This should be generated from each Buffer before the buffer is reset. This likely implies storing the bitSet for each event that has been generated - especially for enableMerge = false, pipelined = false - since events are generated at the end in this case. (Alternate is to generate a single uber event at the end for the non pipelined case - but that's not part of this jira) - Fixed. Getting the stats from individual buffers before reset. Constructing emptypartition bitset out of this. Is it possible to get rid of getSpillIndex - maybe storing the spillIndex within SpillPathDetails. getSpillIndex would not work correctly with multiple spill threads. - Fixed In the close() method "if (currentBuffer.nextPosition == 0) {" - This will send out an event with invalid partition information (global). More important, in case of mergeEnabled=false - this is going to skip all previous events, since it returns immediately after this event. Should this event just be generated with a null pathComponent and a fully set BitSet. - Fixed. Setting bits for all partitions and setting pathcomponent to null in this case. calling finalSpill() ends up mixing filenames for the pipelined case. finalSpill will always generate the final output file name as it is today. - For the pipelined/mergeDisabled case, we generate the spill filenames ending with "_spillId". Otherwise, the last spill event would not be retrieved in consumer side. If it is not pipelined, it would generate the final output file as it is today. I think the rest of the code to generate events may change depending on how emptyPartition details are handled (Likely a list of previous spills - which can be used to generate events rather than relying on getSpillIndex+1). ShuffleManager: "if (shuffleInfoEventsMap.get(srcAttemptIdentifier) == null)" - Shouldn't the following else if attemptNumber == 0 check always be checked ? Otherwise it can let attemptNumber = 1 into the pending list. - Fixed. Reversed the check to handle this. ShuffleManager: On numFetchedSpills - was thinking the log would be something like copy(inputsDone) (spillsDone) of total numInputs complete. Otherwise it can be a little misleading as to how the shuffle is progressing. - Fixed. Added numFetchedSpills to the log. Nit: maybeWriteSpillIndex - rename to handleSpillIndex since it's doing more than just writing to the file. - Fixed Minor: spillIndex isn't really used in SpillCallable - pass in as a String for logging purposes, so that there isn't confusion in the future (especially in case of finalSpill where this is a random value) - Removed spillIndex. It is now available as a part of SpillPathDetails. Minor: getSpillPathDetails doesn't need to set finalOutPath and finalIndexPath (unless it's being used by tests?) - Yes, it is set for testing purpose. Added a comment. > Support pipelined data transfer for Unordered Output > ---------------------------------------------------- > > Key: TEZ-1094 > URL: https://issues.apache.org/jira/browse/TEZ-1094 > Project: Apache Tez > Issue Type: Improvement > Reporter: Siddharth Seth > Assignee: Rajesh Balamohan > Attachments: TEZ-1094.1.patch, TEZ-1094.2.patch, TEZ-1094.3.patch > > > For unsorted output (and possibly for sorted output), it should be possible > to send data in small batches instead of waiting for everything to be > generated before transmitting. For now, planning on getting started with > UnsortedOutput / Input pairs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)