leixm commented on code in PR #3650:
URL: https://github.com/apache/celeborn/pull/3650#discussion_r3076931095


##########
client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java:
##########
@@ -511,36 +547,32 @@ public static boolean 
shouldReportShuffleFetchFailure(long taskId) {
                   taskInfo.attemptNumber(),
                   ti.attemptNumber());
               hasRunningAttempt = true;
-            } else if ("FAILED".equals(ti.status()) || 
"UNKNOWN".equals(ti.status())) {
-              // For KILLED state task, Spark does not count the number of 
failures
-              // For UNKNOWN state task, Spark does count the number of 
failures
-              // For FAILED state task, Spark decides whether to count the 
failure based on the
-              // different failure reasons. Since we cannot obtain the failure
-              // reason here, we will count all tasks in FAILED state.
-              LOG.info(
-                  "StageId={} index={} taskId={} attempt={} another attempt {} 
status={}.",
-                  stageId,
-                  taskInfo.index(),
-                  taskId,
-                  taskInfo.attemptNumber(),
-                  ti.attemptNumber(),
-                  ti.status());
-              failedTaskAttempts += 1;
             }
           }
         }
         // The following situations should trigger a FetchFailed exception:
-        //  1. If failedTaskAttempts >= maxTaskFails
-        //  2. If no other taskAttempts are running
-        if (failedTaskAttempts >= maxTaskFails || !hasRunningAttempt) {
+        //  1. If total failures (previous failures + current failure) >= 
maxTaskFails
+        //  2. If no other taskAttempts are running, trigger a FetchFailed 
exception
+        //  to keep the same behavior as Spark.
+        // Note: previousFailureCount does NOT include the current failure,
+        //       so (previousFailureCount + 1) represents the total failure 
count.
+        int previousFailureCount = getTaskFailureCount(taskSetManager, 
taskInfo.index());
+        // Fail-safe: if failure count cannot be determined, conservatively 
trigger
+        // FetchFailed to avoid silently swallowing the error.
+        if (previousFailureCount < 0) {
+          return true;
+        }

Review Comment:
   I think this is unreasonable. It shouldn't check `!hasRunningAttempt`, but 
should directly return `true`. Otherwise, `FetchFailed` won't be triggered and 
app will fail.



##########
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java:
##########
@@ -375,36 +411,32 @@ public static boolean 
shouldReportShuffleFetchFailure(long taskId) {
                   taskInfo.attemptNumber(),
                   ti.attemptNumber());
               hasRunningAttempt = true;
-            } else if ("FAILED".equals(ti.status()) || 
"UNKNOWN".equals(ti.status())) {
-              // For KILLED state task, Spark does not count the number of 
failures
-              // For UNKNOWN state task, Spark does count the number of 
failures
-              // For FAILED state task, Spark decides whether to count the 
failure based on the
-              // different failure reasons. Since we cannot obtain the failure
-              // reason here, we will count all tasks in FAILED state.
-              logger.info(
-                  "StageId={} index={} taskId={} attempt={} another attempt {} 
status={}.",
-                  stageId,
-                  taskInfo.index(),
-                  taskId,
-                  taskInfo.attemptNumber(),
-                  ti.attemptNumber(),
-                  ti.status());
-              failedTaskAttempts += 1;
             }
           }
         }
         // The following situations should trigger a FetchFailed exception:
-        //  1. If failedTaskAttempts >= maxTaskFails
-        //  2. If no other taskAttempts are running
-        if (failedTaskAttempts >= maxTaskFails || !hasRunningAttempt) {
+        //  1. If total failures (previous failures + current failure) >= 
maxTaskFails
+        //  2. If no other taskAttempts are running, trigger a FetchFailed 
exception
+        //  to keep the same behavior as Spark.
+        // Note: previousFailureCount does NOT include the current failure,
+        //       so (previousFailureCount + 1) represents the total failure 
count.
+        int previousFailureCount = getTaskFailureCount(taskSetManager, 
taskInfo.index());
+        // Fail-safe: if failure count cannot be determined, conservatively 
trigger
+        // FetchFailed to avoid silently swallowing the error.
+        if (previousFailureCount < 0) {
+          return true;

Review Comment:
   I think this is unreasonable. It shouldn't check `!hasRunningAttempt`, but 
should directly return `true`. Otherwise, `FetchFailed` won't be triggered and 
app will fail.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to