[ 
https://issues.apache.org/jira/browse/TEZ-2358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajesh Balamohan updated TEZ-2358:
----------------------------------
    Attachment: TEZ-2358.1.patch

Since spill_Id was missing, it ended up clobbering the files.  Attaching the .1 
patch which adds makes it unique with srcId & spillId.

[~gopalv], [~sseth] Please have a look at the patch. MAX_VALUE in the patch 
would not collision as it is handled in the finalMerge (which is invoked 
exactly once during merge close) and it would generate exactly one file with 
that spill id during final merge.

> Pipelined Shuffle: MergeManager assumptions about 1 merge per source-task
> -------------------------------------------------------------------------
>
>                 Key: TEZ-2358
>                 URL: https://issues.apache.org/jira/browse/TEZ-2358
>             Project: Apache Tez
>          Issue Type: Bug
>    Affects Versions: 0.7.0
>            Reporter: Gopal V
>            Assignee: Rajesh Balamohan
>         Attachments: TEZ-2358.1.patch, 
> syslog_attempt_1429683757595_0141_1_01_000143_0.syslog.bz2
>
>
> The Tez MergeManager code assumes that the src-task-id is unique between 
> merge operations, this results in some confusion when two merge sequences 
> have to process output from the same src-task-id.
> {code}
> private TezRawKeyValueIterator finalMerge(Configuration job, FileSystem fs,
>                                        List<MapOutput> inMemoryMapOutputs,
>                                        List<FileChunk> onDiskMapOutputs
> ...
>  if (inMemoryMapOutputs.size() > 0) {
>       int srcTaskId = 
> inMemoryMapOutputs.get(0).getAttemptIdentifier().getInputIdentifier().getInputIndex();
> ...
>        // must spill to disk, but can't retain in-mem for intermediate merge
>         final Path outputPath = 
>           mapOutputFile.getInputFileForWrite(srcTaskId,
>                                              inMemToDiskBytes).suffix(
>                                                  
> Constants.MERGED_OUTPUT_PREFIX);
> ...
> {code}
> This or some scenario related to this, results in the following FileChunks 
> list which contains identical named paths with different lengths.
> {code}
> 2015-04-23 03:28:50,983 INFO [MemtoDiskMerger [Map_1]] 
> orderedgrouped.MergeManager: Initiating in-memory merge with 6 segments...
> 2015-04-23 03:28:50,987 INFO [MemtoDiskMerger [Map_1]] impl.TezMerger: 
> Merging 6 sorted segments
> 2015-04-23 03:28:50,988 INFO [MemtoDiskMerger [Map_1]] impl.TezMerger: Down 
> to the last merge-pass, with 6 segments left of total size: 1165944755 bytes
> 2015-04-23 03:28:58,495 INFO [MemtoDiskMerger [Map_1]] 
> orderedgrouped.MergeManager: attempt_1429683757595_0141_1_01_000143_0_10027 
> Merge of the 6 files in-memory complete. Local file is 
> /grid/5/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_404.out.merged
>  of size 785583965
> 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] 
> orderedgrouped.MergeManager: finalMerge called with 0 in-memory map-outputs 
> and 5 on-disk map-outputs
> 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] 
> orderedgrouped.MergeManager: GOPAL: onDiskBytes = 365232290 += 
> 365232290for/grid/4/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_1023.out
> 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] 
> orderedgrouped.MergeManager: GOPAL: onDiskBytes = 730529899 += 
> 365297609for/grid/5/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_404.out
> 2015-04-23 03:28:58,496 INFO [ShuffleAndMergeRunner [Map_1]] 
> orderedgrouped.MergeManager: GOPAL: onDiskBytes = 1095828683 += 
> 365298784for/grid/5/cluster/yarn/local/usercache/gopal/appcache/application_1429683757595_0141/attempt_1429683757595_0141_1_01_000143_0_10027_spill_404.out
> {code}
> The multiple instances of 404.out indicates that we pulled two pipelined 
> chunks of the same shuffle src id, once into memory and twice onto disk.
> {code}
> 2015-04-23 03:28:08,256 INFO 
> [TezTaskEventRouter[attempt_1429683757595_0141_1_01_000143_0]] 
> orderedgrouped.ShuffleInputEventHandlerOrderedGrouped: DME srcIdx: 143, 
> targetIdx: 404, attemptNum: 0, payload: [hasEmptyPartitions: true, host: 
> cn047-10.l42scl.hortonworks.com, port: 13562, pathComponent: 
> attempt_1429683757595_0141_1_00_000404_0_10009_0, runDuration: 0]
> 2015-04-23 03:28:08,270 INFO 
> [TezTaskEventRouter[attempt_1429683757595_0141_1_01_000143_0]] 
> orderedgrouped.ShuffleInputEventHandlerOrderedGrouped: DME srcIdx: 143, 
> targetIdx: 404, attemptNum: 0, payload: [hasEmptyPartitions: true, host: 
> cn047-10.l42scl.hortonworks.com, port: 13562, pathComponent: 
> attempt_1429683757595_0141_1_00_000404_0_10009_1, runDuration: 0]
> 2015-04-23 03:28:08,272 INFO 
> [TezTaskEventRouter[attempt_1429683757595_0141_1_01_000143_0]] 
> orderedgrouped.ShuffleInputEventHandlerOrderedGrouped: DME srcIdx: 143, 
> targetIdx: 404, attemptNum: 0, payload: [hasEmptyPartitions: true, host: 
> cn047-10.l42scl.hortonworks.com, port: 13562, pathComponent: 
> attempt_1429683757595_0141_1_00_000404_0_10009_2, runDuration: 0]
> {code}
> This will fail depending on how many times _404_0 is at the top of the 
> FileChunks list in this run.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to