This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 0a98e51  [SPARK-32923][FOLLOW-UP] Clean up older shuffleMergeId 
shuffle files when finalize request for higher shuffleMergeId is received
0a98e51 is described below

commit 0a98e51b84fb3770705cf01d3cfba493f02ea15c
Author: Venkata krishnan Sowrirajan <vsowrira...@linkedin.com>
AuthorDate: Wed Aug 4 03:30:08 2021 -0500

    [SPARK-32923][FOLLOW-UP] Clean up older shuffleMergeId shuffle files when 
finalize request for higher shuffleMergeId is received
    
    ### What changes were proposed in this pull request?
    
    Clean up older shuffleMergeId shuffle files when finalize request for 
higher shuffleMergeId is received when no blocks pushed for the corresponding 
shuffleMergeId. This is identified as part of 
https://github.com/apache/spark/pull/33034#discussion_r680610872.
    
    ### Why are the changes needed?
    
    Without this change, older shuffleMergeId files won't be cleaned up 
properly.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Added changes to existing unit test to address this case.
    
    Closes #33605 from venkata91/SPARK-32923-follow-on.
    
    Authored-by: Venkata krishnan Sowrirajan <vsowrira...@linkedin.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
    (cherry picked from commit d8169493b662acac31d3fc5e6c5051917428c974)
    Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
---
 .../network/shuffle/RemoteBlockPushResolver.java      |  8 +++++++-
 .../network/shuffle/RemoteBlockPushResolverSuite.java | 19 +++++++++++++++++++
 2 files changed, 26 insertions(+), 1 deletion(-)

diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index cc7d4db..9a45f2c 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -513,12 +513,18 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
       }
     } else {
       appShuffleInfo.shuffles.compute(msg.shuffleId, (id, value) -> {
-        if (null == value || msg.shuffleMergeId != value.shuffleMergeId ||
+        if (null == value || msg.shuffleMergeId < value.shuffleMergeId ||
           INDETERMINATE_SHUFFLE_FINALIZED == value.shuffleMergePartitions) {
           throw new RuntimeException(String.format(
             "Shuffle merge finalize request for shuffle %s with" + " 
shuffleMergeId %s is %s",
             msg.shuffleId, msg.shuffleMergeId,
             ErrorHandler.BlockPushErrorHandler.STALE_SHUFFLE_FINALIZE_SUFFIX));
+        } else if (msg.shuffleMergeId > value.shuffleMergeId) {
+          // If no blocks pushed for the finalizeShuffleMerge shuffleMergeId 
then return
+          // empty MergeStatuses but cleanup the older shuffleMergeId files.
+          mergedShuffleCleaner.execute(() ->
+            closeAndDeletePartitionFiles(value.shuffleMergePartitions));
+          return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
         } else {
           shuffleMergePartitionsRef.set(value.shuffleMergePartitions);
           return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
index 46a1569..6bf39c8 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
@@ -1234,6 +1234,25 @@ public class RemoteBlockPushResolverSuite {
     pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, 
NO_ATTEMPT_ID, 0, 3));
     MergedBlockMeta mergedBlockMeta = pushResolver.getMergedBlockMeta(testApp, 
0, 3, 0);
     validateChunks(testApp, 0, 3, 0, mergedBlockMeta, new int[]{2}, new 
int[][]{{0}});
+
+    StreamCallbackWithID stream4 =
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(testApp, NO_ATTEMPT_ID, 0, 4, 0, 0, 0));
+    closed.acquire();
+    // Do not finalize shuffleMergeId 4 can happen during stage cancellation.
+    stream4.onData(stream4.getID(), ByteBuffer.wrap(new byte[2]));
+    stream4.onComplete(stream4.getID());
+
+    // Check whether the data is cleaned up when higher shuffleMergeId 
finalize request comes
+    // but no blocks pushed for that shuffleMergeId
+    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, 
NO_ATTEMPT_ID, 0, 5));
+    closed.acquire();
+    assertFalse("MergedBlock meta file for shuffle 0 and shuffleMergeId 4 
should be cleaned"
+      + " up", appShuffleInfo.getMergedShuffleMetaFile(0, 4, 0).exists());
+    assertFalse("MergedBlock index file for shuffle 0 and shuffleMergeId 4 
should be cleaned"
+      + " up", appShuffleInfo.getMergedShuffleIndexFile(0, 4, 0).exists());
+    assertFalse("MergedBlock data file for shuffle 0 and shuffleMergeId 4 
should be cleaned"
+      + " up", appShuffleInfo.getMergedShuffleDataFile(0, 4, 0).exists());
   }
 
   private void useTestFiles(boolean useTestIndexFile, boolean useTestMetaFile) 
throws IOException {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to