leixm commented on code in PR #3650:
URL: https://github.com/apache/celeborn/pull/3650#discussion_r3076931095
##########
client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java:
##########
@@ -511,36 +547,32 @@ public static boolean
shouldReportShuffleFetchFailure(long taskId) {
taskInfo.attemptNumber(),
ti.attemptNumber());
hasRunningAttempt = true;
- } else if ("FAILED".equals(ti.status()) ||
"UNKNOWN".equals(ti.status())) {
- // For KILLED state task, Spark does not count the number of
failures
- // For UNKNOWN state task, Spark does count the number of
failures
- // For FAILED state task, Spark decides whether to count the
failure based on the
- // different failure reasons. Since we cannot obtain the failure
- // reason here, we will count all tasks in FAILED state.
- LOG.info(
- "StageId={} index={} taskId={} attempt={} another attempt {}
status={}.",
- stageId,
- taskInfo.index(),
- taskId,
- taskInfo.attemptNumber(),
- ti.attemptNumber(),
- ti.status());
- failedTaskAttempts += 1;
}
}
}
// The following situations should trigger a FetchFailed exception:
- // 1. If failedTaskAttempts >= maxTaskFails
- // 2. If no other taskAttempts are running
- if (failedTaskAttempts >= maxTaskFails || !hasRunningAttempt) {
+ // 1. If total failures (previous failures + current failure) >=
maxTaskFails
+ // 2. If no other taskAttempts are running, trigger a FetchFailed
exception
+ // to keep the same behavior as Spark.
+ // Note: previousFailureCount does NOT include the current failure,
+ // so (previousFailureCount + 1) represents the total failure
count.
+ int previousFailureCount = getTaskFailureCount(taskSetManager,
taskInfo.index());
+ // Fail-safe: if failure count cannot be determined, conservatively
trigger
+ // FetchFailed to avoid silently swallowing the error.
+ if (previousFailureCount < 0) {
+ return true;
+ }
Review Comment:
I think this is unreasonable. It shouldn't check `!hasRunningAttempt`, but
should directly return `true`. Otherwise, `FetchFailed` won't be triggered and
app will fail.
##########
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java:
##########
@@ -375,36 +411,32 @@ public static boolean
shouldReportShuffleFetchFailure(long taskId) {
taskInfo.attemptNumber(),
ti.attemptNumber());
hasRunningAttempt = true;
- } else if ("FAILED".equals(ti.status()) ||
"UNKNOWN".equals(ti.status())) {
- // For KILLED state task, Spark does not count the number of
failures
- // For UNKNOWN state task, Spark does count the number of
failures
- // For FAILED state task, Spark decides whether to count the
failure based on the
- // different failure reasons. Since we cannot obtain the failure
- // reason here, we will count all tasks in FAILED state.
- logger.info(
- "StageId={} index={} taskId={} attempt={} another attempt {}
status={}.",
- stageId,
- taskInfo.index(),
- taskId,
- taskInfo.attemptNumber(),
- ti.attemptNumber(),
- ti.status());
- failedTaskAttempts += 1;
}
}
}
// The following situations should trigger a FetchFailed exception:
- // 1. If failedTaskAttempts >= maxTaskFails
- // 2. If no other taskAttempts are running
- if (failedTaskAttempts >= maxTaskFails || !hasRunningAttempt) {
+ // 1. If total failures (previous failures + current failure) >=
maxTaskFails
+ // 2. If no other taskAttempts are running, trigger a FetchFailed
exception
+ // to keep the same behavior as Spark.
+ // Note: previousFailureCount does NOT include the current failure,
+ // so (previousFailureCount + 1) represents the total failure
count.
+ int previousFailureCount = getTaskFailureCount(taskSetManager,
taskInfo.index());
+ // Fail-safe: if failure count cannot be determined, conservatively
trigger
+ // FetchFailed to avoid silently swallowing the error.
+ if (previousFailureCount < 0) {
+ return true;
Review Comment:
I think this is unreasonable. It shouldn't check `!hasRunningAttempt`, but
should directly return `true`. Otherwise, `FetchFailed` won't be triggered and
app will fail.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]