[ https://issues.apache.org/jira/browse/SPARK-37675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Mridul Muralidharan resolved SPARK-37675. ----------------------------------------- Fix Version/s: 3.3.0 3.2.2 Resolution: Fixed Issue resolved by pull request 35325 [https://github.com/apache/spark/pull/35325] > Push-based merge finalization bugs in the RemoteBlockPushResolver > ----------------------------------------------------------------- > > Key: SPARK-37675 > URL: https://issues.apache.org/jira/browse/SPARK-37675 > Project: Spark > Issue Type: Sub-task > Components: Shuffle > Affects Versions: 3.2.0 > Reporter: Cheng Pan > Assignee: Chandni Singh > Priority: Major > Fix For: 3.3.0, 3.2.2 > > > We identified 3 issues in the handling of merge finalization requests in the > RemoteBlockPushResolver: > 1. Empty merge data > If the shuffle gets finalized while a reducer partition is still receiving > its first block, when merger finalizes that partition, we will end up with no > data in the files - as it gets truncated to the last good position (which > will be 0 in this case). > Even though no data exists for the reducer - we still add it to result > (merged reducerIds). > 2. Overwriting of the merged data file of a reduce partition after it is > finalized > This is a more involved issue where some specific set of situations must > occur, and starts with how our check for a {{too late block}} is done > [here|https://github.com/apache/spark/blob/50758ab1a3d6a5f73a2419149a1420d103930f77/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java#L180]. > The example below gives more details, but in a nutshell we have the following > for a DETERMINATE shuffle: > # Merge starts, blocks are accepted. > # Merge is finalized. > ** Files closed, status reported to driver, appShuffleInfo.shuffles cleaned > up. > # Late block push from an executor received. > ** Request for a reducer for which merger never received a data until then - > so no on-disk files > ** Our check does not catch this case - we end up (re-) starting merge. > # Executor could now push blocks for reducers which were finalized earlier. > ** Files are truncated. > # Reads will see inconsistent state due to the ongoing writes. > Explaining this with an example with for a DETERMINATE shuffleId 1, > shuffleMergeId 0, and reduce partitions 100 and 200: > # shufflePush_1_0_0_100 is received by the RemoteBlockPushResolver. > ## No meta information existed for shuffle 1 so shuffle service creates > AppShuffleMergePartitionsInfo for shuffle 1 and shuffleMerge 0 to start merge. > ## Merge starts with RemoteBlockPushResolver and it creates the data file > for the merger request shuffleMerged_$APP_ID_1_0_100.data (along with > index/meta files) > # FinalizeShuffleMerge message for shuffleId 1 and shuffleMerged 0 is > received by RemoteBlockPushResolver. In a thread safe manner: > ## AppShuffleMergePartitionsInfo for shuffle 1 is removed from the map in > memory. > ## shuffleMerged_$APP_ID_1_0_100.data/index/meta files are closed. > ## Driver is informed that partition 100 of shuffleId 1/mergeId 0 was merged. > # shufflePush_1_0_0_200 is received by the RemoteBlockPushResolver. > ## A new AppShuffleMergePartitionsInfo is added since: > ### There is no AppShuffleMergePartitionsInfo for shuffle 1/merged id 0 - as > it was removed during finalization, and > ### The merger had never received data for partition 200 until then. > ## With this, shuffleMerged…200.data is created, and on that merger, merge > for shuffleId 1/mergeId 0 starts again. > # shufflePush_1_0_5_100 is received by the RemoteBlockPushResolver. We > randomize the order of pushes, so late pushes from an executor can end up > pushing reducer 200 followed by data for reducer 100. > ## AppShuffleMergePartitionsInfo was created for shuffle 1 and shuffleMerged > 0 in 3-1 which doesn’t have the reduce id 100, the data/index/meta files for > these partitions will be recreated. Reference code. > 3. Throwing exception in the finalization of a shuffle for which the shuffle > server didn't receive any blocks. > For very small stages and with low > minCompletedPushRatio/minShuffleSizeToWait, the driver can initiate the > finalization of a shuffle right away. The shuffle server may not receive any > push blocks and so there will not be a {{AppShuffleMergePartitionsInfo}} > instance corresponding to the shuffle in the state. In this case, we should > mark the shuffle as finalized and return empty results. -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org