[ https://issues.apache.org/jira/browse/SPARK-41792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17653208#comment-17653208 ]
Apache Spark commented on SPARK-41792: -------------------------------------- User 'mridulm' has created a pull request for this issue: https://github.com/apache/spark/pull/39316 > Shuffle merge finalization removes the wrong finalization state from the DB > --------------------------------------------------------------------------- > > Key: SPARK-41792 > URL: https://issues.apache.org/jira/browse/SPARK-41792 > Project: Spark > Issue Type: Bug > Components: Shuffle > Affects Versions: 3.3.0, 3.4.0 > Reporter: Mridul Muralidharan > Priority: Minor > > During `finalizeShuffleMerge` in external shuffle service, if the > finalization request is for a newer shuffle merge id, then we cleanup the > existing (older) shuffle details and add the newer entry (for which we got no > pushed blocks) to the DB. > Unfortunately, when cleaning up from the DB, we are using the incorrect > AppAttemptShuffleMergeId - we remove the latest shuffle merge id instead of > the existing entry. > Proposed Fix: > {code} > diff --git > a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java > > b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java > index 816d1082850..551104d0eba 100644 > --- > a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java > +++ > b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java > @@ -653,9 +653,11 @@ public class RemoteBlockPushResolver implements > MergedShuffleFileManager { > } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) { > // If no blocks pushed for the finalizeShuffleMerge shuffleMergeId > then return > // empty MergeStatuses but cleanup the older shuffleMergeId files. > + AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId = new > AppAttemptShuffleMergeId( > + msg.appId, msg.appAttemptId, msg.shuffleId, > mergePartitionsInfo.shuffleMergeId); > submitCleanupTask(() -> > closeAndDeleteOutdatedPartitions( > - appAttemptShuffleMergeId, > mergePartitionsInfo.shuffleMergePartitions)); > + currentAppAttemptShuffleMergeId, > mergePartitionsInfo.shuffleMergePartitions)); > } else { > // This block covers: > // 1. finalization of determinate stage > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org