This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.4 by this push:
     new f281d376f [CELEBORN-1506][FOLLOWUP] InFlightRequestTracker should not 
reset totalInflightReqs for cleaning up to avoid negative totalInflightReqs for 
limitZeroInFlight
f281d376f is described below

commit f281d376fd31e2b27e2a81488d60478e8646f552
Author: SteNicholas <[email protected]>
AuthorDate: Fri Sep 6 17:25:13 2024 +0800

    [CELEBORN-1506][FOLLOWUP] InFlightRequestTracker should not reset 
totalInflightReqs for cleaning up to avoid negative totalInflightReqs for 
limitZeroInFlight
    
    ### What changes were proposed in this pull request?
    
    `InFlightRequestTracker` should not reset `totalInflightReqs` for cleaning 
up to avoid negative `totalInflightReqs` for `limitZeroInFlight`.
    
    Follow up #2621.
    
    ### Why are the changes needed?
    
    After #2621, there is a common case that attempt 0 succeeds in write and 
celeborn client successfully calls mapperEnd, but the attempt fails due to 
certain exception like `ExecutorLostFailure` . Meanwhile, other attempts are 
rerun, then clean up `pushState` because of mapper ended. The case causes the 
exception which is `Waiting timeout for task %s while limiting zero in-flight 
requests` for `limitZeroInFlight`. Therefore, `InFlightRequestTracker` could 
not reset `totalInflightReqs` f [...]
    
    
![image](https://github.com/user-attachments/assets/3b66d42e-5d6a-411f-8c3a-360349897ab7)
    
    ```
    4/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 
(TID 1205)] WARN InFlightRequestTracker: Clear InFlightRequestTracker
    24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 
(TID 1205)] INFO TransportClientFactory: Found inactive connection to 
/172.27.36.10:9092, creating a new one.
    24/09/05 08:27:04 [data-client-5-1] WARN InFlightRequestTracker: BatchIdSet 
of 172.27.164.39:9092 is null.
    24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 
(TID 1205)] INFO TransportClientFactory: Found inactive connection to 
/172.27.168.38:9092, creating a new one.
    24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 
(TID 1205)] INFO TransportClientFactory: Found inactive connection to 
/172.27.36.31:9092, creating a new one.
    24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 
(TID 1205)] INFO TransportClientFactory: Found inactive connection to 
/172.27.160.19:9092, creating a new one.
    24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 
(TID 1205)] INFO TransportClientFactory: Found inactive connection to 
/172.27.32.31:9092, creating a new one.
    24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 
(TID 1205)] INFO TransportClientFactory: Found inactive connection to 
/172.27.44.32:9092, creating a new one.
    24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 
(TID 1205)] INFO TransportClientFactory: Found inactive connection to 
/172.27.168.18:9092, creating a new one.
    24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 
(TID 1205)] INFO TransportClientFactory: Found inactive connection to 
/172.27.172.18:9092, creating a new one.
    24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0 
(TID 1205)] INFO TransportClientFactory: Found inactive connection to 
/172.27.164.19:9092, creating a new one.
    24/09/05 08:27:15 [dispatcher-Executor] INFO Executor: Executor is trying 
to kill task 706.1 in stage 1.0 (TID 1203), reason: another attempt succeeded
    24/09/05 08:27:15 [Executor task launch worker for task 706.1 in stage 1.0 
(TID 1203)] INFO Executor: Executor interrupted and killed task 706.1 in stage 
1.0 (TID 1203), reason: another attempt succeeded
    24/09/05 08:47:08 [Executor task launch worker for task 715.2 in stage 1.0 
(TID 1205)] ERROR InFlightRequestTracker: After waiting for 1200000 ms, there 
are still [] in flight, which exceeds the current limit 0.
    24/09/05 08:47:08 [Executor task launch worker for task 715.2 in stage 1.0 
(TID 1205)] ERROR Executor: Exception in task 715.2 in stage 1.0 (TID 1205)
    org.apache.celeborn.common.exception.CelebornIOException: Waiting timeout 
for task 1-715-2 while limiting zero in-flight requests
            at 
org.apache.celeborn.client.ShuffleClientImpl.limitZeroInFlight(ShuffleClientImpl.java:676)
            at 
org.apache.celeborn.client.ShuffleClientImpl.mapEndInternal(ShuffleClientImpl.java:1555)
            at 
org.apache.celeborn.client.ShuffleClientImpl.mapperEnd(ShuffleClientImpl.java:1539)
            at 
org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.close(HashBasedShuffleWriter.java:367)
            at 
org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.write(HashBasedShuffleWriter.java:175)
            at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
            at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100)
            at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
            at org.apache.spark.scheduler.Task.run(Task.scala:144)
            at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:598)
            at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1545)
            at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:603)
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    24/09/05 08:49:21 [dispatcher-Executor] INFO 
YarnCoarseGrainedExecutorBackend: Driver commanded a shutdown
    24/09/05 08:49:21 [CoarseGrainedExecutorBackend-stop-executor] WARN 
ShuffleClientImpl: Shuffle client has been shutdown!
    24/09/05 08:49:21 [CoarseGrainedExecutorBackend-stop-executor] INFO 
MemoryStore: MemoryStore cleared
    24/09/05 08:49:21 [CoarseGrainedExecutorBackend-stop-executor] INFO 
BlockManager: BlockManager stopped
    24/09/05 08:49:21 [pool-5-thread-1] INFO ShutdownHookManager: Shutdown hook 
called
    ntImpl.mapperEnd(ShuffleClientImpl.java:1539)
            at 
org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.close(HashBasedShuffleWriter.java:367)
            at 
org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.write(HashBasedShuffleWriter.java:175)
            at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
            at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100)
            at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
            at org.apache.spark.scheduler.Task.run(Task.scala:144)
            at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:598)
            at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1545)
            at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:603)
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    No.
    
    Closes #2725 from SteNicholas/CELEBORN-1506.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: Shuang <[email protected]>
    (cherry picked from commit 5875f6871fc5bfa2483cf35b1a1968c38c6dea0b)
    Signed-off-by: Shuang <[email protected]>
---
 .../common/write/InFlightRequestTracker.java       | 67 +++++++++++++---------
 1 file changed, 41 insertions(+), 26 deletions(-)

diff --git 
a/common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java
 
b/common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java
index 9350c09ef..f46a75650 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java
@@ -49,6 +49,8 @@ public class InFlightRequestTracker {
   private final int maxInFlightReqsTotal;
   private final LongAdder totalInflightReqs = new LongAdder();
 
+  private volatile boolean cleaned = false;
+
   public InFlightRequestTracker(CelebornConf conf, PushState pushState) {
     this.waitInflightTimeoutMs = conf.clientPushLimitInFlightTimeoutMs();
     this.delta = conf.clientPushLimitInFlightSleepDeltaMs();
@@ -67,11 +69,10 @@ public class InFlightRequestTracker {
 
   public void removeBatch(int batchId, String hostAndPushPort) {
     Set<Integer> batchIdSet = inflightBatchesPerAddress.get(hostAndPushPort);
-    // TODO: Need to debug why batchIdSet will be null.
     if (batchIdSet != null) {
       batchIdSet.remove(batchId);
     } else {
-      logger.warn("BatchIdSet of {} is null.", hostAndPushPort);
+      logger.info("Batches of {} in flight is null.", hostAndPushPort);
     }
     totalInflightReqs.decrement();
   }
@@ -97,19 +98,24 @@ public class InFlightRequestTracker {
     pushStrategy.limitPushSpeed(pushState, hostAndPushPort);
     int currentMaxReqsInFlight = 
pushStrategy.getCurrentMaxReqsInFlight(hostAndPushPort);
 
-    Set batchIdSet = getBatchIdSetByAddressPair(hostAndPushPort);
+    Set<Integer> batchIdSet = getBatchIdSetByAddressPair(hostAndPushPort);
     long times = waitInflightTimeoutMs / delta;
     try {
       while (times > 0) {
-        if (totalInflightReqs.sum() <= maxInFlightReqsTotal
-            && batchIdSet.size() <= currentMaxReqsInFlight) {
-          break;
-        }
-        if (pushState.exception.get() != null) {
-          throw pushState.exception.get();
+        if (cleaned) {
+          // MapEnd cleans up push state, which does not exceed the max 
requests in flight limit.
+          return false;
+        } else {
+          if (totalInflightReqs.sum() <= maxInFlightReqsTotal
+              && batchIdSet.size() <= currentMaxReqsInFlight) {
+            break;
+          }
+          if (pushState.exception.get() != null) {
+            throw pushState.exception.get();
+          }
+          Thread.sleep(delta);
+          times--;
         }
-        Thread.sleep(delta);
-        times--;
       }
     } catch (InterruptedException e) {
       pushState.exception.set(new CelebornIOException(e));
@@ -118,10 +124,12 @@ public class InFlightRequestTracker {
     if (times <= 0) {
       logger.warn(
           "After waiting for {} ms, "
-              + "there are still {} batches in flight "
-              + "for hostAndPushPort {}, "
+              + "there are still {} requests in flight (limit: {}): "
+              + "{} batches for hostAndPushPort {}, "
               + "which exceeds the current limit {}.",
           waitInflightTimeoutMs,
+          totalInflightReqs.sum(),
+          maxInFlightReqsTotal,
           batchIdSet.size(),
           hostAndPushPort,
           currentMaxReqsInFlight);
@@ -142,14 +150,19 @@ public class InFlightRequestTracker {
 
     try {
       while (times > 0) {
-        if (totalInflightReqs.sum() == 0) {
-          break;
+        if (cleaned) {
+          // MapEnd cleans up push state, which does not exceed the zero 
requests in flight limit.
+          return false;
+        } else {
+          if (totalInflightReqs.sum() == 0) {
+            break;
+          }
+          if (pushState.exception.get() != null) {
+            throw pushState.exception.get();
+          }
+          Thread.sleep(delta);
+          times--;
         }
-        if (pushState.exception.get() != null) {
-          throw pushState.exception.get();
-        }
-        Thread.sleep(delta);
-        times--;
       }
     } catch (InterruptedException e) {
       pushState.exception.set(new CelebornIOException(e));
@@ -158,9 +171,10 @@ public class InFlightRequestTracker {
     if (times <= 0) {
       logger.error(
           "After waiting for {} ms, "
-              + "there are still {} in flight, "
+              + "there are still {} requests in flight: {}, "
               + "which exceeds the current limit 0.",
           waitInflightTimeoutMs,
+          totalInflightReqs.sum(),
           inflightBatchesPerAddress.entrySet().stream()
               .filter(c -> !c.getValue().isEmpty())
               .map(c -> c.getValue().size() + " batches for hostAndPushPort " 
+ c.getKey())
@@ -179,11 +193,12 @@ public class InFlightRequestTracker {
   }
 
   public void cleanup() {
-    if (!inflightBatchesPerAddress.isEmpty()) {
-      logger.warn("Clear {}", this.getClass().getSimpleName());
-      inflightBatchesPerAddress.clear();
-      totalInflightReqs.reset();
-    }
+    logger.info(
+        "Cleanup {} requests and {} batches in flight.",
+        totalInflightReqs.sum(),
+        inflightBatchesPerAddress.size());
+    cleaned = true;
+    inflightBatchesPerAddress.clear();
     pushStrategy.clear();
   }
 }

Reply via email to