This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 1a3b9f35b [CELEBORN-2129] CelebornBufferStream should invoke 
openStreamInternal in moveToNextPartitionIfPossible to avoid client creation 
timeout
1a3b9f35b is described below

commit 1a3b9f35b50d467beb7a39788588f1bb05e65441
Author: SteNicholas <[email protected]>
AuthorDate: Wed Aug 27 14:21:15 2025 +0800

    [CELEBORN-2129] CelebornBufferStream should invoke openStreamInternal in 
moveToNextPartitionIfPossible to avoid client creation timeout
    
    ### What changes were proposed in this pull request?
    
    `CelebornBufferStream` should invoke `openStreamInternal` in 
`moveToNextPartitionIfPossible` to avoid client creation timeout.
    
    ### Why are the changes needed?
    
    There are many `CelebornIOException` that is cause by timeout client 
creation in production environment as follows:
    
    ```
    2025-08-22 16:20:10,681 INFO  [flink-akka.actor.default-dispatcher-40] 
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - 
[vertex-2]Calc(select=[lz4sql, rawsize, obcluster, ds, hh, mm, PROCTIME() AS 
$6]) -> Sort(orderBy=[lz4sql ASC, rawsize ASC, obcluster ASC, ds ASC, hh ASC, 
mm ASC, $6 DESC]) -> OverAggregate(partitionBy=[lz4sql, rawsize, obcluster, ds, 
hh, mm], orderBy=[$6 DESC], window#0=[ROW_NUMBER(*) AS w0$o0 ROWS BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW], selec [...]
    java.io.IOException: 
org.apache.celeborn.common.exception.CelebornIOException: Connecting to /:9093 
timed out (60000 ms)
            at 
org.apache.celeborn.common.network.client.TransportClientFactory.internalCreateClient(TransportClientFactory.java:313)
            at 
org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:250)
            at 
org.apache.celeborn.common.network.client.TransportClientFactory.retryCreateClient(TransportClientFactory.java:157)
            at 
org.apache.celeborn.plugin.flink.network.FlinkTransportClientFactory.createClientWithRetry(FlinkTransportClientFactory.java:51)
            at 
org.apache.celeborn.plugin.flink.readclient.CelebornBufferStream.openStreamInternal(CelebornBufferStream.java:200)
            at 
org.apache.celeborn.plugin.flink.readclient.CelebornBufferStream.moveToNextPartitionIfPossible(CelebornBufferStream.java:183)
            at 
org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.onStreamEnd(RemoteBufferStreamReader.java:161)
            at 
org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:79)
            at 
org.apache.celeborn.plugin.flink.network.ReadClientHandler.processMessageInternal(ReadClientHandler.java:64)
            at 
org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:100)
            at 
org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:111)
            at 
org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:76)
            at 
org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:100)
            at 
org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:84)
            at 
org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156)
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
            at 
org.apache.celeborn.shaded.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
            at 
org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.decodeBody(TransportFrameDecoderWithBufferSupplier.java:95)
            at 
org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.channelRead(TransportFrameDecoderWithBufferSupplier.java:184)
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
            at 
org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
            at 
org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
            at 
org.apache.celeborn.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
            at 
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
            at 
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
            at 
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
            at 
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
            at 
org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
            at 
org.apache.celeborn.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
            at 
org.apache.celeborn.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
            at java.base/java.lang.Thread.run(Thread.java:991)
    
            at 
org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.errorReceived(RemoteBufferStreamReader.java:146)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
            at 
org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:77)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
            at 
org.apache.celeborn.plugin.flink.readclient.CelebornBufferStream.moveToNextPartitionIfPossible(CelebornBufferStream.java:193)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
            at 
org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.onStreamEnd(RemoteBufferStreamReader.java:161)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
            at 
org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:79)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
            at 
org.apache.celeborn.plugin.flink.network.ReadClientHandler.processMessageInternal(ReadClientHandler.java:64)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
            at 
org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:100)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
            at 
org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:111)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
            at 
org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:76)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
            at 
org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:100)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
            at 
org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:84)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
            at 
org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
            at 
org.apache.celeborn.shaded.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
            at 
org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.decodeBody(TransportFrameDecoderWithBufferSupplier.java:95)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
            at 
org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.channelRead(TransportFrameDecoderWithBufferSupplier.java:184)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
            at 
org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
            at 
org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
            at 
org.apache.celeborn.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
            at 
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
            at 
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
            at 
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
            at 
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
            at 
org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
            at 
org.apache.celeborn.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
            at 
org.apache.celeborn.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
            at java.lang.Thread.run(Thread.java:991) ~[?:?]
    ```
    
    `CelebornBufferStream` should invoke `openStreamInternal` in 
`moveToNextPartitionIfPossible` to avoid client creation timeout, which is 
caused by creating a client using the callback thread of netty.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Manual test.
    
    Closes #3450 from SteNicholas/CELEBORN-2129.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
---
 .../plugin/flink/client/CelebornBufferStream.java  | 66 ++++++++++++++++------
 .../flink/client/FlinkShuffleClientImpl.java       | 13 ++++-
 .../org/apache/celeborn/common/CelebornConf.scala  | 10 ++++
 docs/configuration/client.md                       |  1 +
 4 files changed, 73 insertions(+), 17 deletions(-)

diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/CelebornBufferStream.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/CelebornBufferStream.java
index a0a498ff4..04f8b2387 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/CelebornBufferStream.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/CelebornBufferStream.java
@@ -19,6 +19,8 @@ package org.apache.celeborn.plugin.flink.client;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
@@ -64,6 +66,7 @@ public class CelebornBufferStream {
   private Supplier<ByteBuf> bufferSupplier;
   private int initialCredit;
   private Consumer<RequestMessage> messageConsumer;
+  private ExecutorService openStreamThreadPool;
 
   public CelebornBufferStream() {}
 
@@ -74,7 +77,8 @@ public class CelebornBufferStream {
       PartitionLocation[] locations,
       int subIndexStart,
       int subIndexEnd,
-      long pushDataTimeoutMs) {
+      long pushDataTimeoutMs,
+      ExecutorService openStreamThreadPool) {
     this.mapShuffleClient = mapShuffleClient;
     this.clientFactory = dataClientFactory;
     this.shuffleKey = shuffleKey;
@@ -82,6 +86,7 @@ public class CelebornBufferStream {
     this.subIndexStart = subIndexStart;
     this.subIndexEnd = subIndexEnd;
     this.pushDataTimeoutMs = pushDataTimeoutMs;
+    this.openStreamThreadPool = openStreamThreadPool;
   }
 
   public void open(
@@ -161,7 +166,8 @@ public class CelebornBufferStream {
       PartitionLocation[] locations,
       int subIndexStart,
       int subIndexEnd,
-      long pushDataTimeoutMs) {
+      long pushDataTimeoutMs,
+      ExecutorService openStreamThreadPool) {
     if (locations == null || locations.length == 0) {
       return empty();
     } else {
@@ -172,7 +178,8 @@ public class CelebornBufferStream {
           locations,
           subIndexStart,
           subIndexEnd,
-          pushDataTimeoutMs);
+          pushDataTimeoutMs,
+          openStreamThreadPool);
     }
   }
 
@@ -214,7 +221,7 @@ public class CelebornBufferStream {
       @Nullable BiConsumer<Long, Integer> requiredSegmentIdConsumer,
       boolean sync) {
     logger.debug(
-        "MoveToNextPartitionIfPossible in this:{},  endedStreamId: {}, 
currentLocationIndex: {}, currentSteamId:{}, locationsLength:{}",
+        "MoveToNextPartitionIfPossible in this: {},  endedStreamId: {}, 
currentLocationIndex: {}, currentSteamId: {}, locationsLength: {}.",
         this,
         endedStreamId,
         currentLocationIndex.get(),
@@ -226,18 +233,45 @@ public class CelebornBufferStream {
     }
 
     if (currentLocationIndex.get() < locations.length) {
-      try {
-        openStreamInternal(requiredSegmentIdConsumer, sync);
-        logger.debug(
-            "MoveToNextPartitionIfPossible after openStream this:{},  
endedStreamId: {}, currentLocationIndex: {}, currentSteamId:{}, 
locationsLength:{}",
-            this,
-            endedStreamId,
-            currentLocationIndex.get(),
-            streamId,
-            locations.length);
-      } catch (Exception e) {
-        logger.warn("Failed to open stream and report to flink framework. ", 
e);
-        messageConsumer.accept(new TransportableError(0L, e));
+      if (sync) {
+        try {
+          openStreamInternal(requiredSegmentIdConsumer, true);
+          logger.debug(
+              "MoveToNextPartitionIfPossible after openStream this: {},  
endedStreamId: {}, currentLocationIndex: {}, currentSteamId: {}, 
locationsLength: {}.",
+              this,
+              endedStreamId,
+              currentLocationIndex.get(),
+              streamId,
+              locations.length);
+        } catch (Exception e) {
+          logger.warn("Failed to open stream and report to flink framework. ", 
e);
+          messageConsumer.accept(new TransportableError(0L, e));
+        }
+      } else {
+        CompletableFuture.runAsync(
+                () -> {
+                  try {
+                    openStreamInternal(requiredSegmentIdConsumer, false);
+                  } catch (IOException | InterruptedException e) {
+                    throw new RuntimeException(e);
+                  }
+                },
+                openStreamThreadPool)
+            .whenComplete(
+                (result, throwable) -> {
+                  if (throwable == null) {
+                    logger.debug(
+                        "MoveToNextPartitionIfPossible after openStream this: 
{},  endedStreamId: {}, currentLocationIndex: {}, currentSteamId: {}, 
locationsLength: {}.",
+                        this,
+                        endedStreamId,
+                        currentLocationIndex.get(),
+                        streamId,
+                        locations.length);
+                  } else {
+                    logger.warn("Failed to open stream and report to flink 
framework. ", throwable);
+                    messageConsumer.accept(new TransportableError(0L, 
throwable));
+                  }
+                });
       }
     }
   }
diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/FlinkShuffleClientImpl.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/FlinkShuffleClientImpl.java
index 5b402b2ed..87a80006a 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/FlinkShuffleClientImpl.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/FlinkShuffleClientImpl.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -64,6 +65,7 @@ import org.apache.celeborn.common.rpc.RpcEndpointRef;
 import org.apache.celeborn.common.util.CollectionUtils;
 import org.apache.celeborn.common.util.JavaUtils;
 import org.apache.celeborn.common.util.PbSerDeUtils;
+import org.apache.celeborn.common.util.ThreadUtils;
 import org.apache.celeborn.common.util.Utils;
 import org.apache.celeborn.common.write.PushState;
 import org.apache.celeborn.plugin.flink.network.FlinkTransportClientFactory;
@@ -85,6 +87,8 @@ public class FlinkShuffleClientImpl extends ShuffleClientImpl 
{
   /** The buffer size bytes in flink, default value is 32KB. */
   private final int bufferSizeBytes;
 
+  private final ExecutorService openStreamThreadPool;
+
   public static FlinkShuffleClientImpl get(
       String appUniqueId,
       String driverHost,
@@ -160,6 +164,9 @@ public class FlinkShuffleClientImpl extends 
ShuffleClientImpl {
     if (readClientHandler != null) {
       readClientHandler.close();
     }
+    if (openStreamThreadPool != null) {
+      ThreadUtils.shutdown(openStreamThreadPool);
+    }
   }
 
   public FlinkShuffleClientImpl(
@@ -180,6 +187,9 @@ public class FlinkShuffleClientImpl extends 
ShuffleClientImpl {
             dataTransportConf, readClientHandler, 
conf.clientCloseIdleConnections());
     this.setupLifecycleManagerRef(driverHost, port);
     this.driverTimestamp = driverTimestamp;
+    this.openStreamThreadPool =
+        ThreadUtils.newDaemonCachedThreadPool(
+            "client-buffer-stream-opener", 
conf.clientFlinkOpenStreamThreads(), 60);
   }
 
   private void initializeTransportClientFactory() {
@@ -238,7 +248,8 @@ public class FlinkShuffleClientImpl extends 
ShuffleClientImpl {
           partitionLocations,
           subPartitionIndexStart,
           subPartitionIndexEnd,
-          pushDataTimeout);
+          pushDataTimeout,
+          openStreamThreadPool);
     }
   }
 
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 5cf7bf12e..7379a19df 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1473,6 +1473,8 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
   def clientFlinkDataCompressionEnabled: Boolean = 
get(CLIENT_DATA_COMPRESSION_ENABLED)
   def clientFlinkMetricsScopeNamingShuffle: String =
     get(CLIENT_METRICS_SCOPE_NAMING_SHUFFLE)
+  def clientFlinkOpenStreamThreads: Int =
+    
get(CLIENT_OPEN_STREAM_THREADS).getOrElse(Runtime.getRuntime.availableProcessors())
   def clientChunkPrefetchEnabled = get(CLIENT_CHUNK_PREFETCH_ENABLED)
   def clientInputStreamCreationWindow = get(CLIENT_INPUTSTREAM_CREATION_WINDOW)
 
@@ -6031,6 +6033,14 @@ object CelebornConf extends Logging {
       .createWithDefault(
         
"<host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>.<shuffle_id>")
 
+  val CLIENT_OPEN_STREAM_THREADS: OptionalConfigEntry[Int] =
+    buildConf("celeborn.client.flink.open.stream.threads")
+      .categories("client")
+      .doc("Thread number of flink shuffle client to open buffer stream. 
Default value is Runtime.getRuntime.availableProcessors.")
+      .version("0.6.1")
+      .intConf
+      .createOptional
+
   val CLIENT_MR_PUSH_DATA_MAX: ConfigEntry[Long] =
     buildConf("celeborn.client.mr.pushData.max")
       .categories("client")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index f9f14fe65..8e640d0f3 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -42,6 +42,7 @@ license: |
 | celeborn.client.flink.inputGate.memory | 32m | false | Memory reserved for a 
input gate. | 0.3.0 | remote-shuffle.job.memory-per-gate | 
 | celeborn.client.flink.inputGate.supportFloatingBuffer | true | false | 
Whether to support floating buffer in Flink input gates. | 0.3.0 | 
remote-shuffle.job.support-floating-buffer-per-input-gate | 
 | celeborn.client.flink.metrics.scope.shuffle | 
&lt;host&gt;.taskmanager.&lt;tm_id&gt;.&lt;job_name&gt;.&lt;task_name&gt;.&lt;subtask_index&gt;.&lt;shuffle_id&gt;
 | false | Defines the scope format string that is applied to all metrics 
scoped to a shuffle. Only effective when a identifier-based reporter is 
configured | 0.6.0 |  | 
+| celeborn.client.flink.open.stream.threads | &lt;undefined&gt; | false | 
Thread number of flink shuffle client to open buffer stream. Default value is 
Runtime.getRuntime.availableProcessors. | 0.6.1 |  | 
 | celeborn.client.flink.partitionConnectionException.enabled | false | false | 
If enabled, 
`org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException`
 would be thrown when RemoteBufferStreamReader finds that the current exception 
is about connection failure, then Flink can be aware of the lost Celeborn 
server side nodes and be able to re-compute affected data. | 0.6.0 |  | 
 | celeborn.client.flink.resultPartition.memory | 64m | false | Memory reserved 
for a result partition. | 0.3.0 | remote-shuffle.job.memory-per-partition | 
 | celeborn.client.flink.resultPartition.supportFloatingBuffer | true | false | 
Whether to support floating buffer for result partitions. | 0.3.0 | 
remote-shuffle.job.support-floating-buffer-per-output-gate | 

Reply via email to