This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new fe267496c [CELEBORN-2154] Optimize the exception handling of DFS read 
to avoid tasks from getting stuck
fe267496c is described below

commit fe267496cd2a067f86142e2fb03713bfcee3ab7f
Author: xxx <[email protected]>
AuthorDate: Sat Sep 20 15:11:30 2025 +0800

    [CELEBORN-2154] Optimize the exception handling of DFS read to avoid tasks 
from getting stuck
    
    ### What changes were proposed in this pull request?
    
    Optimize the exception handling of DFS read to avoid tasks from getting 
stuck.
    
    ### Why are the changes needed?
    
    Optimize the exception handling of DFS read to avoid tasks from getting 
stuck.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    CI.
    
    Closes #3482 from xy2953396112/CELEBORN-2154.
    
    Authored-by: xxx <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
    (cherry picked from commit 52d30574f1d1887bb90ecc4e260524229876160c)
    Signed-off-by: SteNicholas <[email protected]>
---
 .../org/apache/celeborn/client/read/DfsPartitionReader.java   | 11 ++++++-----
 .../java/org/apache/celeborn/client/read/PartitionReader.java |  3 +--
 2 files changed, 7 insertions(+), 7 deletions(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java 
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
index 63136f00d..5493e7f29 100644
--- 
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
+++ 
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
@@ -60,7 +60,7 @@ public class DfsPartitionReader implements PartitionReader {
   private final long shuffleChunkSize;
   private final int fetchMaxReqsInFlight;
   private final LinkedBlockingQueue<Pair<Integer, ByteBuf>> results;
-  private final AtomicReference<IOException> exception = new 
AtomicReference<>();
+  private final AtomicReference<Exception> exception = new AtomicReference<>();
   private volatile boolean closed = false;
   private ExecutorService fetchThread;
   private boolean fetchThreadStarted;
@@ -236,7 +236,7 @@ public class DfsPartitionReader implements PartitionReader {
   }
 
   @Override
-  public ByteBuf next() throws IOException, InterruptedException {
+  public ByteBuf next() throws Exception {
     Pair<Integer, ByteBuf> chunk = null;
     checkpoint();
     if (!fetchThreadStarted) {
@@ -283,6 +283,7 @@ public class DfsPartitionReader implements PartitionReader {
               }
             } catch (Exception e) {
               logger.warn("Fetch thread is cancelled.", e);
+              exception.set(e);
               // cancel a task for speculative, ignore this exception
             }
             logger.debug("fetch {} is done.", 
location.getStorageInfo().getFilePath());
@@ -297,7 +298,7 @@ public class DfsPartitionReader implements PartitionReader {
             TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startFetchWait));
         logger.debug("poll result with result size: {}", results.size());
       }
-    } catch (InterruptedException e) {
+    } catch (Exception e) {
       logger.error("PartitionReader thread interrupted while fetching data.");
       throw e;
     }
@@ -306,8 +307,8 @@ public class DfsPartitionReader implements PartitionReader {
     return chunk.getRight();
   }
 
-  private void checkException() throws IOException {
-    IOException e = exception.get();
+  private void checkException() throws Exception {
+    Exception e = exception.get();
     if (e != null) {
       throw e;
     }
diff --git 
a/client/src/main/java/org/apache/celeborn/client/read/PartitionReader.java 
b/client/src/main/java/org/apache/celeborn/client/read/PartitionReader.java
index 0ebb94e74..247eacff5 100644
--- a/client/src/main/java/org/apache/celeborn/client/read/PartitionReader.java
+++ b/client/src/main/java/org/apache/celeborn/client/read/PartitionReader.java
@@ -17,7 +17,6 @@
 
 package org.apache.celeborn.client.read;
 
-import java.io.IOException;
 import java.util.Optional;
 
 import io.netty.buffer.ByteBuf;
@@ -28,7 +27,7 @@ import org.apache.celeborn.common.protocol.PartitionLocation;
 public interface PartitionReader {
   boolean hasNext();
 
-  ByteBuf next() throws IOException, InterruptedException;
+  ByteBuf next() throws Exception;
 
   void close();
 

Reply via email to