SteNicholas opened a new pull request, #3450:
URL: https://github.com/apache/celeborn/pull/3450
### What changes were proposed in this pull request?
`CelebornBufferStream` should invoke `openStreamInternal` in
`moveToNextPartitionIfPossible` to avoid client creation timeout.
### Why are the changes needed?
There are many `CelebornIOException` that is cause by timeout client
creation in production environment as follows:
```
2025-08-22 16:20:10,681 INFO [flink-akka.actor.default-dispatcher-40]
org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
[vertex-2]Calc(select=[lz4sql, rawsize, obcluster, ds, hh, mm, PROCTIME() AS
$6]) -> Sort(orderBy=[lz4sql ASC, rawsize ASC, obcluster ASC, ds ASC, hh ASC,
mm ASC, $6 DESC]) -> OverAggregate(partitionBy=[lz4sql, rawsize, obcluster, ds,
hh, mm], orderBy=[$6 DESC], window#0=[ROW_NUMBER(*) AS w0$o0 ROWS BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW], select=[lz4sql, rawsize, obcluster, ds,
hh, mm, $6, w0$o0]) -> Calc(select=[lz4sql, rawsize, obcluster, ds, hh, mm],
where=[(w0$o0 = 1)]) (668/1900)
(d8bf48183d8c69a1ab84bcd445f6d4ed_0e8289f2bf927649dd2511bbc2bb6759_667_0)
switched from RUNNING to FAILED on antc4flink4172792604-taskmanager-403 @
(dataPort=1).
java.io.IOException:
org.apache.celeborn.common.exception.CelebornIOException: Connecting to /:9093
timed out (60000 ms)
at
org.apache.celeborn.common.network.client.TransportClientFactory.internalCreateClient(TransportClientFactory.java:313)
at
org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:250)
at
org.apache.celeborn.common.network.client.TransportClientFactory.retryCreateClient(TransportClientFactory.java:157)
at
org.apache.celeborn.plugin.flink.network.FlinkTransportClientFactory.createClientWithRetry(FlinkTransportClientFactory.java:51)
at
org.apache.celeborn.plugin.flink.readclient.CelebornBufferStream.openStreamInternal(CelebornBufferStream.java:200)
at
org.apache.celeborn.plugin.flink.readclient.CelebornBufferStream.moveToNextPartitionIfPossible(CelebornBufferStream.java:183)
at
org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.onStreamEnd(RemoteBufferStreamReader.java:161)
at
org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:79)
at
org.apache.celeborn.plugin.flink.network.ReadClientHandler.processMessageInternal(ReadClientHandler.java:64)
at
org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:100)
at
org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:111)
at
org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:76)
at
org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:100)
at
org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:84)
at
org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at
org.apache.celeborn.shaded.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at
org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.decodeBody(TransportFrameDecoderWithBufferSupplier.java:95)
at
org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.channelRead(TransportFrameDecoderWithBufferSupplier.java:184)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at
org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at
org.apache.celeborn.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at
org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at
org.apache.celeborn.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at
org.apache.celeborn.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:991)
at
org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.errorReceived(RemoteBufferStreamReader.java:146)
~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at
org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:77)
~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at
org.apache.celeborn.plugin.flink.readclient.CelebornBufferStream.moveToNextPartitionIfPossible(CelebornBufferStream.java:193)
~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at
org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.onStreamEnd(RemoteBufferStreamReader.java:161)
~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at
org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:79)
~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at
org.apache.celeborn.plugin.flink.network.ReadClientHandler.processMessageInternal(ReadClientHandler.java:64)
~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at
org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:100)
~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at
org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:111)
~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at
org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:76)
~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at
org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:100)
~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at
org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:84)
~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at
org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156)
~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at
org.apache.celeborn.shaded.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)
~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at
org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.decodeBody(TransportFrameDecoderWithBufferSupplier.java:95)
~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at
org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.channelRead(TransportFrameDecoderWithBufferSupplier.java:184)
~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at
org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at
org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at
org.apache.celeborn.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at
org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at
org.apache.celeborn.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at
org.apache.celeborn.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at java.lang.Thread.run(Thread.java:991) ~[?:?]
```
`CelebornBufferStream` should invoke `openStreamInternal` in
`moveToNextPartitionIfPossible` to avoid client creation timeout, which is
caused by creating a client using the callback thread of netty.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]