SteNicholas opened a new pull request, #3450:
URL: https://github.com/apache/celeborn/pull/3450

   ### What changes were proposed in this pull request?
   
   `CelebornBufferStream` should invoke `openStreamInternal` in 
`moveToNextPartitionIfPossible` to avoid client creation timeout.
   
   ### Why are the changes needed?
   
   There are many `CelebornIOException` that is cause by timeout client 
creation in production environment as follows:
   
   ```
   2025-08-22 16:20:10,681 INFO  [flink-akka.actor.default-dispatcher-40] 
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - 
[vertex-2]Calc(select=[lz4sql, rawsize, obcluster, ds, hh, mm, PROCTIME() AS 
$6]) -> Sort(orderBy=[lz4sql ASC, rawsize ASC, obcluster ASC, ds ASC, hh ASC, 
mm ASC, $6 DESC]) -> OverAggregate(partitionBy=[lz4sql, rawsize, obcluster, ds, 
hh, mm], orderBy=[$6 DESC], window#0=[ROW_NUMBER(*) AS w0$o0 ROWS BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW], select=[lz4sql, rawsize, obcluster, ds, 
hh, mm, $6, w0$o0]) -> Calc(select=[lz4sql, rawsize, obcluster, ds, hh, mm], 
where=[(w0$o0 = 1)]) (668/1900) 
(d8bf48183d8c69a1ab84bcd445f6d4ed_0e8289f2bf927649dd2511bbc2bb6759_667_0) 
switched from RUNNING to FAILED on antc4flink4172792604-taskmanager-403 @ 
(dataPort=1).
   java.io.IOException: 
org.apache.celeborn.common.exception.CelebornIOException: Connecting to /:9093 
timed out (60000 ms)
        at 
org.apache.celeborn.common.network.client.TransportClientFactory.internalCreateClient(TransportClientFactory.java:313)
        at 
org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:250)
        at 
org.apache.celeborn.common.network.client.TransportClientFactory.retryCreateClient(TransportClientFactory.java:157)
        at 
org.apache.celeborn.plugin.flink.network.FlinkTransportClientFactory.createClientWithRetry(FlinkTransportClientFactory.java:51)
        at 
org.apache.celeborn.plugin.flink.readclient.CelebornBufferStream.openStreamInternal(CelebornBufferStream.java:200)
        at 
org.apache.celeborn.plugin.flink.readclient.CelebornBufferStream.moveToNextPartitionIfPossible(CelebornBufferStream.java:183)
        at 
org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.onStreamEnd(RemoteBufferStreamReader.java:161)
        at 
org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:79)
        at 
org.apache.celeborn.plugin.flink.network.ReadClientHandler.processMessageInternal(ReadClientHandler.java:64)
        at 
org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:100)
        at 
org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:111)
        at 
org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:76)
        at 
org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:100)
        at 
org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:84)
        at 
org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156)
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at 
org.apache.celeborn.shaded.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at 
org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.decodeBody(TransportFrameDecoderWithBufferSupplier.java:95)
        at 
org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.channelRead(TransportFrameDecoderWithBufferSupplier.java:184)
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at 
org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at 
org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at 
org.apache.celeborn.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at 
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
        at 
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
        at 
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
        at 
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
        at 
org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
        at 
org.apache.celeborn.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at 
org.apache.celeborn.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:991)
   
        at 
org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.errorReceived(RemoteBufferStreamReader.java:146)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
        at 
org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:77)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
        at 
org.apache.celeborn.plugin.flink.readclient.CelebornBufferStream.moveToNextPartitionIfPossible(CelebornBufferStream.java:193)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
        at 
org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.onStreamEnd(RemoteBufferStreamReader.java:161)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
        at 
org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:79)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
        at 
org.apache.celeborn.plugin.flink.network.ReadClientHandler.processMessageInternal(ReadClientHandler.java:64)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
        at 
org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:100)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
        at 
org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:111)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
        at 
org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:76)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
        at 
org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:100)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
        at 
org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:84)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
        at 
org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
        at 
org.apache.celeborn.shaded.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
        at 
org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.decodeBody(TransportFrameDecoderWithBufferSupplier.java:95)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
        at 
org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.channelRead(TransportFrameDecoderWithBufferSupplier.java:184)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
        at 
org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
        at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
        at 
org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
        at 
org.apache.celeborn.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
        at 
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
        at 
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
        at 
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
        at 
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
        at 
org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
        at 
org.apache.celeborn.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
        at 
org.apache.celeborn.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
        at java.lang.Thread.run(Thread.java:991) ~[?:?]
   ```
   
   `CelebornBufferStream` should invoke `openStreamInternal` in 
`moveToNextPartitionIfPossible` to avoid client creation timeout, which is 
caused by creating a client using the callback thread of netty.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   Manual test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to