This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new fe267496c [CELEBORN-2154] Optimize the exception handling of DFS read
to avoid tasks from getting stuck
fe267496c is described below
commit fe267496cd2a067f86142e2fb03713bfcee3ab7f
Author: xxx <[email protected]>
AuthorDate: Sat Sep 20 15:11:30 2025 +0800
[CELEBORN-2154] Optimize the exception handling of DFS read to avoid tasks
from getting stuck
### What changes were proposed in this pull request?
Optimize the exception handling of DFS read to avoid tasks from getting
stuck.
### Why are the changes needed?
Optimize the exception handling of DFS read to avoid tasks from getting
stuck.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes #3482 from xy2953396112/CELEBORN-2154.
Authored-by: xxx <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
(cherry picked from commit 52d30574f1d1887bb90ecc4e260524229876160c)
Signed-off-by: SteNicholas <[email protected]>
---
.../org/apache/celeborn/client/read/DfsPartitionReader.java | 11 ++++++-----
.../java/org/apache/celeborn/client/read/PartitionReader.java | 3 +--
2 files changed, 7 insertions(+), 7 deletions(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
index 63136f00d..5493e7f29 100644
---
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
+++
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
@@ -60,7 +60,7 @@ public class DfsPartitionReader implements PartitionReader {
private final long shuffleChunkSize;
private final int fetchMaxReqsInFlight;
private final LinkedBlockingQueue<Pair<Integer, ByteBuf>> results;
- private final AtomicReference<IOException> exception = new
AtomicReference<>();
+ private final AtomicReference<Exception> exception = new AtomicReference<>();
private volatile boolean closed = false;
private ExecutorService fetchThread;
private boolean fetchThreadStarted;
@@ -236,7 +236,7 @@ public class DfsPartitionReader implements PartitionReader {
}
@Override
- public ByteBuf next() throws IOException, InterruptedException {
+ public ByteBuf next() throws Exception {
Pair<Integer, ByteBuf> chunk = null;
checkpoint();
if (!fetchThreadStarted) {
@@ -283,6 +283,7 @@ public class DfsPartitionReader implements PartitionReader {
}
} catch (Exception e) {
logger.warn("Fetch thread is cancelled.", e);
+ exception.set(e);
// cancel a task for speculative, ignore this exception
}
logger.debug("fetch {} is done.",
location.getStorageInfo().getFilePath());
@@ -297,7 +298,7 @@ public class DfsPartitionReader implements PartitionReader {
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startFetchWait));
logger.debug("poll result with result size: {}", results.size());
}
- } catch (InterruptedException e) {
+ } catch (Exception e) {
logger.error("PartitionReader thread interrupted while fetching data.");
throw e;
}
@@ -306,8 +307,8 @@ public class DfsPartitionReader implements PartitionReader {
return chunk.getRight();
}
- private void checkException() throws IOException {
- IOException e = exception.get();
+ private void checkException() throws Exception {
+ Exception e = exception.get();
if (e != null) {
throw e;
}
diff --git
a/client/src/main/java/org/apache/celeborn/client/read/PartitionReader.java
b/client/src/main/java/org/apache/celeborn/client/read/PartitionReader.java
index 0ebb94e74..247eacff5 100644
--- a/client/src/main/java/org/apache/celeborn/client/read/PartitionReader.java
+++ b/client/src/main/java/org/apache/celeborn/client/read/PartitionReader.java
@@ -17,7 +17,6 @@
package org.apache.celeborn.client.read;
-import java.io.IOException;
import java.util.Optional;
import io.netty.buffer.ByteBuf;
@@ -28,7 +27,7 @@ import org.apache.celeborn.common.protocol.PartitionLocation;
public interface PartitionReader {
boolean hasNext();
- ByteBuf next() throws IOException, InterruptedException;
+ ByteBuf next() throws Exception;
void close();