This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new e7e191b3f [#2279] improvement(spark): Trigger the upstream rewrite
when the read stage fails (#2281)
e7e191b3f is described below
commit e7e191b3ffc9c0ced99e648e1f0aa9eda41336fc
Author: yl09099 <[email protected]>
AuthorDate: Fri Dec 20 14:40:10 2024 +0800
[#2279] improvement(spark): Trigger the upstream rewrite when the read
stage fails (#2281)
### What changes were proposed in this pull request?
If the current Reader fails to obtain Shuffle data, it does not trigger the
upstream Stage to rewrite the data. If a Shuffle Server fails, it does not
trigger Stage retry.
### Why are the changes needed?
Fix: https://github.com/apache/incubator-uniffle/issues/2279
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT.
---
.../shuffle/manager/ShuffleManagerGrpcService.java | 77 +++++++++++++++++-----
.../apache/uniffle/test/RSSStageResubmitTest.java | 2 +-
2 files changed, 61 insertions(+), 18 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
index 667b6a905..c986ae02a 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
@@ -122,13 +122,13 @@ public class ShuffleManagerGrpcService extends
ShuffleManagerImplBase {
shuffleManager.getShuffleWriteClient().unregisterShuffle(appId, shuffleId);
shuffleServerWriterFailureRecord.setClearedMapTrackerBlock(true);
LOG.info(
- "Clear shuffle result in shuffleId:{}, stageId:{},
stageAttemptNumber:{}.",
+ "Clear shuffle result in shuffleId:{}, stageId:{},
stageAttemptNumber:{} in the write failure phase.",
shuffleId,
stageAttemptId,
stageAttemptNumber);
} catch (SparkException e) {
LOG.error(
- "Clear MapoutTracker Meta failed in shuffleId:{},
stageAttemptId:{}, stageAttemptNumber:{}.",
+ "Clear MapoutTracker Meta failed in shuffleId:{},
stageAttemptId:{}, stageAttemptNumber:{} in the write failure phase.",
shuffleId,
stageAttemptId,
stageAttemptNumber);
@@ -158,6 +158,7 @@ public class ShuffleManagerGrpcService extends
ShuffleManagerImplBase {
RssProtos.ReportShuffleFetchFailureRequest request,
StreamObserver<RssProtos.ReportShuffleFetchFailureResponse>
responseObserver) {
String appId = request.getAppId();
+ int shuffleId = request.getShuffleId();
int stageAttempt = request.getStageAttemptId();
int partitionId = request.getPartitionId();
RssProtos.StatusCode code;
@@ -189,18 +190,37 @@ public class ShuffleManagerGrpcService extends
ShuffleManagerImplBase {
code = RssProtos.StatusCode.INVALID_REQUEST;
reSubmitWholeStage = false;
} else { // update the stage partition fetch failure count
- code = RssProtos.StatusCode.SUCCESS;
- status.incPartitionFetchFailure(stageAttempt, partitionId);
- int fetchFailureNum = status.getPartitionFetchFailureNum(stageAttempt,
partitionId);
- if (fetchFailureNum >= shuffleManager.getMaxFetchFailures()) {
- reSubmitWholeStage = true;
- msg =
- String.format(
- "report shuffle fetch failure as maximum number(%d) of
shuffle fetch is occurred",
- shuffleManager.getMaxFetchFailures());
- } else {
- reSubmitWholeStage = false;
- msg = "don't report shuffle fetch failure";
+ synchronized (status) {
+ code = RssProtos.StatusCode.SUCCESS;
+ status.incPartitionFetchFailure(stageAttempt, partitionId);
+ if (status.currentPartitionIsFetchFailed(stageAttempt, partitionId,
shuffleManager)) {
+ reSubmitWholeStage = true;
+ if (!status.hasClearedMapTrackerBlock()) {
+ try {
+ // Clear the metadata of the completed task, after the
upstream ShuffleId is
+ // cleared, the write Stage can be triggered again.
+ shuffleManager.unregisterAllMapOutput(shuffleId);
+ status.clearedMapTrackerBlock();
+ LOG.info(
+ "Clear shuffle result in shuffleId:{}, stageId:{} in the
write failure phase.",
+ shuffleId,
+ stageAttempt);
+ } catch (SparkException e) {
+ LOG.error(
+ "Clear MapoutTracker Meta failed in shuffleId:{},
stageAttemptId:{} in the write failure phase.",
+ shuffleId,
+ stageAttempt);
+ throw new RssException("Clear MapoutTracker Meta failed!", e);
+ }
+ }
+ msg =
+ String.format(
+ "report shuffle fetch failure as maximum number(%d) of
shuffle fetch is occurred",
+ shuffleManager.getMaxFetchFailures());
+ } else {
+ reSubmitWholeStage = false;
+ msg = "don't report shuffle fetch failure";
+ }
}
}
}
@@ -489,10 +509,13 @@ public class ShuffleManagerGrpcService extends
ShuffleManagerImplBase {
private final ReentrantReadWriteLock.WriteLock writeLock =
lock.writeLock();
private final int[] partitions;
private int stageAttempt;
+ // Whether the Shuffle result has been cleared for the current number of
attempts.
+ private boolean hasClearedMapTrackerBlock;
private RssShuffleStatus(int partitionNum, int stageAttempt) {
this.stageAttempt = stageAttempt;
this.partitions = new int[partitionNum];
+ this.hasClearedMapTrackerBlock = false;
}
private <T> T withReadLock(Supplier<T> fn) {
@@ -553,16 +576,36 @@ public class ShuffleManagerGrpcService extends
ShuffleManagerImplBase {
});
}
- public int getPartitionFetchFailureNum(int stageAttempt, int partition) {
+ public boolean currentPartitionIsFetchFailed(
+ int stageAttempt, int partition, RssShuffleManagerInterface
shuffleManager) {
return withReadLock(
() -> {
if (this.stageAttempt != stageAttempt) {
- return 0;
+ return false;
} else {
- return this.partitions[partition];
+ if (this.partitions[partition] >=
shuffleManager.getMaxFetchFailures()) {
+ return true;
+ } else {
+ return false;
+ }
}
});
}
+
+ public void clearedMapTrackerBlock() {
+ withWriteLock(
+ () -> {
+ this.hasClearedMapTrackerBlock = true;
+ return null;
+ });
+ }
+
+ public boolean hasClearedMapTrackerBlock() {
+ return withReadLock(
+ () -> {
+ return hasClearedMapTrackerBlock;
+ });
+ }
}
@Override
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java
index 0a70e8242..2ea1176ab 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java
@@ -79,7 +79,7 @@ public class RSSStageResubmitTest extends
SparkTaskFailureIntegrationTestBase {
super.updateSparkConfCustomer(sparkConf);
sparkConf.set(
RssSparkConfig.SPARK_RSS_CONFIG_PREFIX
- + RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_WRITE_FAILURE_ENABLED,
+ + RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED,
"true");
}