This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d6b69fbc46 [SPARK-41792][SHUFFLE] Fix DB update for push based 
shuffle when newer shuffle merge is received
5d6b69fbc46 is described below

commit 5d6b69fbc46ea52852dbe86f144ab87495af64a8
Author: Mridul Muralidharan <mridulatgmail.com>
AuthorDate: Sun Jan 1 13:41:35 2023 -0600

    [SPARK-41792][SHUFFLE] Fix DB update for push based shuffle when newer 
shuffle merge is received
    
    ### What changes were proposed in this pull request?
    Incorrect merge id is removed from the DB when a newer shuffle merge id is 
received.
    
    ### Why are the changes needed?
    Bug fix
    
    ### Does this PR introduce _any_ user-facing change?
    No, fixes a corner case bug
    
    ### How was this patch tested?
    Unit test updated
    
    Closes #39316 from mridulm/SPARK-41792.
    
    Authored-by: Mridul Muralidharan <mridulatgmail.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../network/shuffle/RemoteBlockPushResolver.java      | 18 ++++++++++--------
 .../network/shuffle/RemoteBlockPushResolverSuite.java | 19 +++++++++++++++++++
 .../spark/network/yarn/YarnShuffleServiceSuite.scala  |  2 +-
 3 files changed, 30 insertions(+), 9 deletions(-)

diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index 816d1082850..c3a2e9a883a 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -227,15 +227,15 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
             // Higher shuffleMergeId seen for the shuffle ID meaning new stage 
attempt is being
             // run for the shuffle ID. Close and clean up old shuffleMergeId 
files,
             // happens in the indeterminate stage retries
-            AppAttemptShuffleMergeId appAttemptShuffleMergeId =
-                new AppAttemptShuffleMergeId(
-                    appShuffleInfo.appId, appShuffleInfo.attemptId, shuffleId, 
shuffleMergeId);
+            AppAttemptShuffleMergeId currrentAppAttemptShuffleMergeId =
+                new AppAttemptShuffleMergeId(appShuffleInfo.appId, 
appShuffleInfo.attemptId,
+                    shuffleId, latestShuffleMergeId);
             logger.info("{}: creating a new shuffle merge metadata since 
received " +
-                "shuffleMergeId is higher than latest shuffleMergeId {}",
-                appAttemptShuffleMergeId, latestShuffleMergeId);
+                "shuffleMergeId {} is higher than latest shuffleMergeId {}",
+                currrentAppAttemptShuffleMergeId, shuffleMergeId, 
latestShuffleMergeId);
             submitCleanupTask(() ->
-                closeAndDeleteOutdatedPartitions(
-                    appAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions));
+                
closeAndDeleteOutdatedPartitions(currrentAppAttemptShuffleMergeId,
+                    mergePartitionsInfo.shuffleMergePartitions));
             return new AppShuffleMergePartitionsInfo(shuffleMergeId, false);
           } else {
             // The request is for block with same shuffleMergeId as the latest 
shuffleMergeId
@@ -653,9 +653,11 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
         } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
           // If no blocks pushed for the finalizeShuffleMerge shuffleMergeId 
then return
           // empty MergeStatuses but cleanup the older shuffleMergeId files.
+          AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId = new 
AppAttemptShuffleMergeId(
+                  msg.appId, msg.appAttemptId, msg.shuffleId, 
mergePartitionsInfo.shuffleMergeId);
           submitCleanupTask(() ->
               closeAndDeleteOutdatedPartitions(
-                  appAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions));
+                  currentAppAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions));
         } else {
           // This block covers:
           //  1. finalization of determinate stage
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
index eb2c1d9fa5c..6a595ee346d 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
@@ -28,7 +28,9 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadLocalRandom;
 
@@ -1146,11 +1148,14 @@ public class RemoteBlockPushResolverSuite {
   @Test
   public void testCleanupOlderShuffleMergeId() throws IOException, 
InterruptedException {
     Semaphore closed = new Semaphore(0);
+    List<RemoteBlockPushResolver.AppAttemptShuffleMergeId> removedIds =
+            new CopyOnWriteArrayList<>();
     pushResolver = new RemoteBlockPushResolver(conf, null) {
       @Override
       void closeAndDeleteOutdatedPartitions(
           AppAttemptShuffleMergeId appAttemptShuffleMergeId,
           Map<Integer, AppShufflePartitionInfo> partitions) {
+        removedIds.add(appAttemptShuffleMergeId);
         super.closeAndDeleteOutdatedPartitions(appAttemptShuffleMergeId, 
partitions);
         closed.release();
       }
@@ -1167,6 +1172,10 @@ public class RemoteBlockPushResolverSuite {
     RemoteBlockPushResolver.AppShuffleInfo appShuffleInfo =
       pushResolver.validateAndGetAppShuffleInfo(testApp);
     closed.acquire();
+    assertEquals(1, removedIds.size());
+    // For the previous merge id
+    assertEquals(1, removedIds.iterator().next().shuffleMergeId);
+    removedIds.clear();
     assertFalse("Data files on the disk should be cleaned up",
       appShuffleInfo.getMergedShuffleDataFile(0, 1, 0).exists());
     assertFalse("Meta files on the disk should be cleaned up",
@@ -1186,6 +1195,9 @@ public class RemoteBlockPushResolverSuite {
       pushResolver.receiveBlockDataAsStream(
         new PushBlockStream(testApp, NO_ATTEMPT_ID, 0, 3, 0, 0, 0));
     closed.acquire();
+    assertEquals(1, removedIds.size());
+    assertEquals(2, removedIds.iterator().next().shuffleMergeId);
+    removedIds.clear();
     stream3.onData(stream3.getID(), ByteBuffer.wrap(new byte[2]));
     stream3.onComplete(stream3.getID());
     pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, 
NO_ATTEMPT_ID, 0, 3));
@@ -1196,6 +1208,9 @@ public class RemoteBlockPushResolverSuite {
       pushResolver.receiveBlockDataAsStream(
         new PushBlockStream(testApp, NO_ATTEMPT_ID, 0, 4, 0, 0, 0));
     closed.acquire();
+    assertEquals(1, removedIds.size());
+    assertEquals(3, removedIds.iterator().next().shuffleMergeId);
+    removedIds.clear();
     // Do not finalize shuffleMergeId 4 can happen during stage cancellation.
     stream4.onData(stream4.getID(), ByteBuffer.wrap(new byte[2]));
     stream4.onComplete(stream4.getID());
@@ -1204,6 +1219,10 @@ public class RemoteBlockPushResolverSuite {
     // but no blocks pushed for that shuffleMergeId
     pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, 
NO_ATTEMPT_ID, 0, 5));
     closed.acquire();
+    assertEquals(1, removedIds.size());
+    // For the previous merge id - here the cleanup is from 
finalizeShuffleMerge
+    assertEquals(4, removedIds.iterator().next().shuffleMergeId);
+    removedIds.clear();
     assertFalse("MergedBlock meta file for shuffle 0 and shuffleMergeId 4 
should be cleaned"
       + " up", appShuffleInfo.getMergedShuffleMetaFile(0, 4, 0).exists());
     assertFalse("MergedBlock index file for shuffle 0 and shuffleMergeId 4 
should be cleaned"
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
index 16fa4205692..075a21c399e 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
@@ -956,7 +956,7 @@ abstract class YarnShuffleServiceSuite extends 
SparkFunSuite with Matchers {
     assert(ShuffleTestAccessor.getOutdatedAppPathInfoCountDuringDBReload(
       mergeManager2, mergeManager2DB) == 1)
     assert(ShuffleTestAccessor.getOutdatedFinalizedShuffleCountDuringDBReload(
-      mergeManager2, mergeManager2DB) == 2)
+      mergeManager2, mergeManager2DB) == 1)
     s2.stop()
 
     // Yarn Shuffle service comes back up without custom mergeManager


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to