This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new e7e191b3f [#2279] improvement(spark): Trigger the upstream rewrite 
when the read stage fails  (#2281)
e7e191b3f is described below

commit e7e191b3ffc9c0ced99e648e1f0aa9eda41336fc
Author: yl09099 <[email protected]>
AuthorDate: Fri Dec 20 14:40:10 2024 +0800

    [#2279] improvement(spark): Trigger the upstream rewrite when the read 
stage fails  (#2281)
    
    ### What changes were proposed in this pull request?
    
    If the current Reader fails to obtain Shuffle data, it does not trigger the 
upstream Stage to rewrite the data. If a Shuffle Server fails, it does not 
trigger Stage retry.
    
    ### Why are the changes needed?
    
    Fix: https://github.com/apache/incubator-uniffle/issues/2279
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    UT.
---
 .../shuffle/manager/ShuffleManagerGrpcService.java | 77 +++++++++++++++++-----
 .../apache/uniffle/test/RSSStageResubmitTest.java  |  2 +-
 2 files changed, 61 insertions(+), 18 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
index 667b6a905..c986ae02a 100644
--- 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
+++ 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
@@ -122,13 +122,13 @@ public class ShuffleManagerGrpcService extends 
ShuffleManagerImplBase {
                 
shuffleManager.getShuffleWriteClient().unregisterShuffle(appId, shuffleId);
                 
shuffleServerWriterFailureRecord.setClearedMapTrackerBlock(true);
                 LOG.info(
-                    "Clear shuffle result in shuffleId:{}, stageId:{}, 
stageAttemptNumber:{}.",
+                    "Clear shuffle result in shuffleId:{}, stageId:{}, 
stageAttemptNumber:{} in the write failure phase.",
                     shuffleId,
                     stageAttemptId,
                     stageAttemptNumber);
               } catch (SparkException e) {
                 LOG.error(
-                    "Clear MapoutTracker Meta failed in shuffleId:{}, 
stageAttemptId:{}, stageAttemptNumber:{}.",
+                    "Clear MapoutTracker Meta failed in shuffleId:{}, 
stageAttemptId:{}, stageAttemptNumber:{} in the write failure phase.",
                     shuffleId,
                     stageAttemptId,
                     stageAttemptNumber);
@@ -158,6 +158,7 @@ public class ShuffleManagerGrpcService extends 
ShuffleManagerImplBase {
       RssProtos.ReportShuffleFetchFailureRequest request,
       StreamObserver<RssProtos.ReportShuffleFetchFailureResponse> 
responseObserver) {
     String appId = request.getAppId();
+    int shuffleId = request.getShuffleId();
     int stageAttempt = request.getStageAttemptId();
     int partitionId = request.getPartitionId();
     RssProtos.StatusCode code;
@@ -189,18 +190,37 @@ public class ShuffleManagerGrpcService extends 
ShuffleManagerImplBase {
         code = RssProtos.StatusCode.INVALID_REQUEST;
         reSubmitWholeStage = false;
       } else { // update the stage partition fetch failure count
-        code = RssProtos.StatusCode.SUCCESS;
-        status.incPartitionFetchFailure(stageAttempt, partitionId);
-        int fetchFailureNum = status.getPartitionFetchFailureNum(stageAttempt, 
partitionId);
-        if (fetchFailureNum >= shuffleManager.getMaxFetchFailures()) {
-          reSubmitWholeStage = true;
-          msg =
-              String.format(
-                  "report shuffle fetch failure as maximum number(%d) of 
shuffle fetch is occurred",
-                  shuffleManager.getMaxFetchFailures());
-        } else {
-          reSubmitWholeStage = false;
-          msg = "don't report shuffle fetch failure";
+        synchronized (status) {
+          code = RssProtos.StatusCode.SUCCESS;
+          status.incPartitionFetchFailure(stageAttempt, partitionId);
+          if (status.currentPartitionIsFetchFailed(stageAttempt, partitionId, 
shuffleManager)) {
+            reSubmitWholeStage = true;
+            if (!status.hasClearedMapTrackerBlock()) {
+              try {
+                // Clear the metadata of the completed task, after the 
upstream ShuffleId is
+                // cleared, the write Stage can be triggered again.
+                shuffleManager.unregisterAllMapOutput(shuffleId);
+                status.clearedMapTrackerBlock();
+                LOG.info(
+                    "Clear shuffle result in shuffleId:{}, stageId:{} in the 
write failure phase.",
+                    shuffleId,
+                    stageAttempt);
+              } catch (SparkException e) {
+                LOG.error(
+                    "Clear MapoutTracker Meta failed in shuffleId:{}, 
stageAttemptId:{} in the write failure phase.",
+                    shuffleId,
+                    stageAttempt);
+                throw new RssException("Clear MapoutTracker Meta failed!", e);
+              }
+            }
+            msg =
+                String.format(
+                    "report shuffle fetch failure as maximum number(%d) of 
shuffle fetch is occurred",
+                    shuffleManager.getMaxFetchFailures());
+          } else {
+            reSubmitWholeStage = false;
+            msg = "don't report shuffle fetch failure";
+          }
         }
       }
     }
@@ -489,10 +509,13 @@ public class ShuffleManagerGrpcService extends 
ShuffleManagerImplBase {
     private final ReentrantReadWriteLock.WriteLock writeLock = 
lock.writeLock();
     private final int[] partitions;
     private int stageAttempt;
+    // Whether the Shuffle result has been cleared for the current number of 
attempts.
+    private boolean hasClearedMapTrackerBlock;
 
     private RssShuffleStatus(int partitionNum, int stageAttempt) {
       this.stageAttempt = stageAttempt;
       this.partitions = new int[partitionNum];
+      this.hasClearedMapTrackerBlock = false;
     }
 
     private <T> T withReadLock(Supplier<T> fn) {
@@ -553,16 +576,36 @@ public class ShuffleManagerGrpcService extends 
ShuffleManagerImplBase {
           });
     }
 
-    public int getPartitionFetchFailureNum(int stageAttempt, int partition) {
+    public boolean currentPartitionIsFetchFailed(
+        int stageAttempt, int partition, RssShuffleManagerInterface 
shuffleManager) {
       return withReadLock(
           () -> {
             if (this.stageAttempt != stageAttempt) {
-              return 0;
+              return false;
             } else {
-              return this.partitions[partition];
+              if (this.partitions[partition] >= 
shuffleManager.getMaxFetchFailures()) {
+                return true;
+              } else {
+                return false;
+              }
             }
           });
     }
+
+    public void clearedMapTrackerBlock() {
+      withWriteLock(
+          () -> {
+            this.hasClearedMapTrackerBlock = true;
+            return null;
+          });
+    }
+
+    public boolean hasClearedMapTrackerBlock() {
+      return withReadLock(
+          () -> {
+            return hasClearedMapTrackerBlock;
+          });
+    }
   }
 
   @Override
diff --git 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java
 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java
index 0a70e8242..2ea1176ab 100644
--- 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java
+++ 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java
@@ -79,7 +79,7 @@ public class RSSStageResubmitTest extends 
SparkTaskFailureIntegrationTestBase {
     super.updateSparkConfCustomer(sparkConf);
     sparkConf.set(
         RssSparkConfig.SPARK_RSS_CONFIG_PREFIX
-            + RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_WRITE_FAILURE_ENABLED,
+            + RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED,
         "true");
   }
 

Reply via email to