This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.4 by this push:
new f281d376f [CELEBORN-1506][FOLLOWUP] InFlightRequestTracker should not
reset totalInflightReqs for cleaning up to avoid negative totalInflightReqs for
limitZeroInFlight
f281d376f is described below
commit f281d376fd31e2b27e2a81488d60478e8646f552
Author: SteNicholas <[email protected]>
AuthorDate: Fri Sep 6 17:25:13 2024 +0800
[CELEBORN-1506][FOLLOWUP] InFlightRequestTracker should not reset
totalInflightReqs for cleaning up to avoid negative totalInflightReqs for
limitZeroInFlight
### What changes were proposed in this pull request?
`InFlightRequestTracker` should not reset `totalInflightReqs` for cleaning
up to avoid negative `totalInflightReqs` for `limitZeroInFlight`.
Follow up #2621.
### Why are the changes needed?
After #2621, there is a common case that attempt 0 succeeds in write and
celeborn client successfully calls mapperEnd, but the attempt fails due to
certain exception like `ExecutorLostFailure` . Meanwhile, other attempts are
rerun, then clean up `pushState` because of mapper ended. The case causes the
exception which is `Waiting timeout for task %s while limiting zero in-flight
requests` for `limitZeroInFlight`. Therefore, `InFlightRequestTracker` could
not reset `totalInflightReqs` f [...]

```
4/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0
(TID 1205)] WARN InFlightRequestTracker: Clear InFlightRequestTracker
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0
(TID 1205)] INFO TransportClientFactory: Found inactive connection to
/172.27.36.10:9092, creating a new one.
24/09/05 08:27:04 [data-client-5-1] WARN InFlightRequestTracker: BatchIdSet
of 172.27.164.39:9092 is null.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0
(TID 1205)] INFO TransportClientFactory: Found inactive connection to
/172.27.168.38:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0
(TID 1205)] INFO TransportClientFactory: Found inactive connection to
/172.27.36.31:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0
(TID 1205)] INFO TransportClientFactory: Found inactive connection to
/172.27.160.19:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0
(TID 1205)] INFO TransportClientFactory: Found inactive connection to
/172.27.32.31:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0
(TID 1205)] INFO TransportClientFactory: Found inactive connection to
/172.27.44.32:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0
(TID 1205)] INFO TransportClientFactory: Found inactive connection to
/172.27.168.18:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0
(TID 1205)] INFO TransportClientFactory: Found inactive connection to
/172.27.172.18:9092, creating a new one.
24/09/05 08:27:04 [Executor task launch worker for task 715.2 in stage 1.0
(TID 1205)] INFO TransportClientFactory: Found inactive connection to
/172.27.164.19:9092, creating a new one.
24/09/05 08:27:15 [dispatcher-Executor] INFO Executor: Executor is trying
to kill task 706.1 in stage 1.0 (TID 1203), reason: another attempt succeeded
24/09/05 08:27:15 [Executor task launch worker for task 706.1 in stage 1.0
(TID 1203)] INFO Executor: Executor interrupted and killed task 706.1 in stage
1.0 (TID 1203), reason: another attempt succeeded
24/09/05 08:47:08 [Executor task launch worker for task 715.2 in stage 1.0
(TID 1205)] ERROR InFlightRequestTracker: After waiting for 1200000 ms, there
are still [] in flight, which exceeds the current limit 0.
24/09/05 08:47:08 [Executor task launch worker for task 715.2 in stage 1.0
(TID 1205)] ERROR Executor: Exception in task 715.2 in stage 1.0 (TID 1205)
org.apache.celeborn.common.exception.CelebornIOException: Waiting timeout
for task 1-715-2 while limiting zero in-flight requests
at
org.apache.celeborn.client.ShuffleClientImpl.limitZeroInFlight(ShuffleClientImpl.java:676)
at
org.apache.celeborn.client.ShuffleClientImpl.mapEndInternal(ShuffleClientImpl.java:1555)
at
org.apache.celeborn.client.ShuffleClientImpl.mapperEnd(ShuffleClientImpl.java:1539)
at
org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.close(HashBasedShuffleWriter.java:367)
at
org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.write(HashBasedShuffleWriter.java:175)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:144)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:598)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1545)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:603)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
24/09/05 08:49:21 [dispatcher-Executor] INFO
YarnCoarseGrainedExecutorBackend: Driver commanded a shutdown
24/09/05 08:49:21 [CoarseGrainedExecutorBackend-stop-executor] WARN
ShuffleClientImpl: Shuffle client has been shutdown!
24/09/05 08:49:21 [CoarseGrainedExecutorBackend-stop-executor] INFO
MemoryStore: MemoryStore cleared
24/09/05 08:49:21 [CoarseGrainedExecutorBackend-stop-executor] INFO
BlockManager: BlockManager stopped
24/09/05 08:49:21 [pool-5-thread-1] INFO ShutdownHookManager: Shutdown hook
called
ntImpl.mapperEnd(ShuffleClientImpl.java:1539)
at
org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.close(HashBasedShuffleWriter.java:367)
at
org.apache.spark.shuffle.celeborn.HashBasedShuffleWriter.write(HashBasedShuffleWriter.java:175)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:144)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:598)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1545)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:603)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
No.
Closes #2725 from SteNicholas/CELEBORN-1506.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: Shuang <[email protected]>
(cherry picked from commit 5875f6871fc5bfa2483cf35b1a1968c38c6dea0b)
Signed-off-by: Shuang <[email protected]>
---
.../common/write/InFlightRequestTracker.java | 67 +++++++++++++---------
1 file changed, 41 insertions(+), 26 deletions(-)
diff --git
a/common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java
b/common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java
index 9350c09ef..f46a75650 100644
---
a/common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java
+++
b/common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java
@@ -49,6 +49,8 @@ public class InFlightRequestTracker {
private final int maxInFlightReqsTotal;
private final LongAdder totalInflightReqs = new LongAdder();
+ private volatile boolean cleaned = false;
+
public InFlightRequestTracker(CelebornConf conf, PushState pushState) {
this.waitInflightTimeoutMs = conf.clientPushLimitInFlightTimeoutMs();
this.delta = conf.clientPushLimitInFlightSleepDeltaMs();
@@ -67,11 +69,10 @@ public class InFlightRequestTracker {
public void removeBatch(int batchId, String hostAndPushPort) {
Set<Integer> batchIdSet = inflightBatchesPerAddress.get(hostAndPushPort);
- // TODO: Need to debug why batchIdSet will be null.
if (batchIdSet != null) {
batchIdSet.remove(batchId);
} else {
- logger.warn("BatchIdSet of {} is null.", hostAndPushPort);
+ logger.info("Batches of {} in flight is null.", hostAndPushPort);
}
totalInflightReqs.decrement();
}
@@ -97,19 +98,24 @@ public class InFlightRequestTracker {
pushStrategy.limitPushSpeed(pushState, hostAndPushPort);
int currentMaxReqsInFlight =
pushStrategy.getCurrentMaxReqsInFlight(hostAndPushPort);
- Set batchIdSet = getBatchIdSetByAddressPair(hostAndPushPort);
+ Set<Integer> batchIdSet = getBatchIdSetByAddressPair(hostAndPushPort);
long times = waitInflightTimeoutMs / delta;
try {
while (times > 0) {
- if (totalInflightReqs.sum() <= maxInFlightReqsTotal
- && batchIdSet.size() <= currentMaxReqsInFlight) {
- break;
- }
- if (pushState.exception.get() != null) {
- throw pushState.exception.get();
+ if (cleaned) {
+ // MapEnd cleans up push state, which does not exceed the max
requests in flight limit.
+ return false;
+ } else {
+ if (totalInflightReqs.sum() <= maxInFlightReqsTotal
+ && batchIdSet.size() <= currentMaxReqsInFlight) {
+ break;
+ }
+ if (pushState.exception.get() != null) {
+ throw pushState.exception.get();
+ }
+ Thread.sleep(delta);
+ times--;
}
- Thread.sleep(delta);
- times--;
}
} catch (InterruptedException e) {
pushState.exception.set(new CelebornIOException(e));
@@ -118,10 +124,12 @@ public class InFlightRequestTracker {
if (times <= 0) {
logger.warn(
"After waiting for {} ms, "
- + "there are still {} batches in flight "
- + "for hostAndPushPort {}, "
+ + "there are still {} requests in flight (limit: {}): "
+ + "{} batches for hostAndPushPort {}, "
+ "which exceeds the current limit {}.",
waitInflightTimeoutMs,
+ totalInflightReqs.sum(),
+ maxInFlightReqsTotal,
batchIdSet.size(),
hostAndPushPort,
currentMaxReqsInFlight);
@@ -142,14 +150,19 @@ public class InFlightRequestTracker {
try {
while (times > 0) {
- if (totalInflightReqs.sum() == 0) {
- break;
+ if (cleaned) {
+ // MapEnd cleans up push state, which does not exceed the zero
requests in flight limit.
+ return false;
+ } else {
+ if (totalInflightReqs.sum() == 0) {
+ break;
+ }
+ if (pushState.exception.get() != null) {
+ throw pushState.exception.get();
+ }
+ Thread.sleep(delta);
+ times--;
}
- if (pushState.exception.get() != null) {
- throw pushState.exception.get();
- }
- Thread.sleep(delta);
- times--;
}
} catch (InterruptedException e) {
pushState.exception.set(new CelebornIOException(e));
@@ -158,9 +171,10 @@ public class InFlightRequestTracker {
if (times <= 0) {
logger.error(
"After waiting for {} ms, "
- + "there are still {} in flight, "
+ + "there are still {} requests in flight: {}, "
+ "which exceeds the current limit 0.",
waitInflightTimeoutMs,
+ totalInflightReqs.sum(),
inflightBatchesPerAddress.entrySet().stream()
.filter(c -> !c.getValue().isEmpty())
.map(c -> c.getValue().size() + " batches for hostAndPushPort "
+ c.getKey())
@@ -179,11 +193,12 @@ public class InFlightRequestTracker {
}
public void cleanup() {
- if (!inflightBatchesPerAddress.isEmpty()) {
- logger.warn("Clear {}", this.getClass().getSimpleName());
- inflightBatchesPerAddress.clear();
- totalInflightReqs.reset();
- }
+ logger.info(
+ "Cleanup {} requests and {} batches in flight.",
+ totalInflightReqs.sum(),
+ inflightBatchesPerAddress.size());
+ cleaned = true;
+ inflightBatchesPerAddress.clear();
pushStrategy.clear();
}
}