Mridul Muralidharan created SPARK-41792:
-------------------------------------------

             Summary: Shuffle merge finalization removes the wrong finalization 
state from the DB
                 Key: SPARK-41792
                 URL: https://issues.apache.org/jira/browse/SPARK-41792
             Project: Spark
          Issue Type: Bug
          Components: Shuffle
    Affects Versions: 3.3.0, 3.4.0
            Reporter: Mridul Muralidharan


During `finalizeShuffleMerge` in external shuffle service, if the finalization 
request is for a newer shuffle merge id, then we cleanup the existing (older) 
shuffle details and add the newer entry (for which we got no pushed blocks) to 
the DB.

Unfortunately, when cleaning up from the DB, we are using the incorrect 
AppAttemptShuffleMergeId - we remove the latest shuffle merge id instead of the 
existing entry.

Proposed Fix:

{code}
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index 816d1082850..551104d0eba 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -653,9 +653,11 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
         } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
           // If no blocks pushed for the finalizeShuffleMerge shuffleMergeId 
then return
           // empty MergeStatuses but cleanup the older shuffleMergeId files.
+          AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId = new 
AppAttemptShuffleMergeId(
+                  msg.appId, msg.appAttemptId, msg.shuffleId, 
mergePartitionsInfo.shuffleMergeId);
           submitCleanupTask(() ->
               closeAndDeleteOutdatedPartitions(
-                  appAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions));
+                  currentAppAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions));
         } else {
           // This block covers:
           //  1. finalization of determinate stage
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to