Hi Wenrui,

I suspect another issue which might cause connection failure. You can check 
whether the netty server already binds and listens port successfully in time 
before the client requests connection. If there exists some time-consuming 
process during TM startup which might delay netty server start, so when the 
client requests connection, the server is not ready which may cause connection 
timeout or failure.

From your description, it seems exist in only some TM. Because when you 
decrease the total parallel, it might miss the problem TM and does not cause 
this issue. The default number of netty thread and timeout should make sense 
for normal cases.

Best,
Zhijiang


------------------------------------------------------------------
From:Wenrui Meng <wenruim...@gmail.com>
Send Time:2019年1月9日(星期三) 18:18
To:Till Rohrmann <trohrm...@apache.org>
Cc:user <user@flink.apache.org>; Konstantin <konstan...@data-artisans.com>
Subject:Re: ConnectTimeoutException when createPartitionRequestClient

Hi Till,

This job is not on AthenaX but on a special uber version Flink. I tried to ping 
the connected host from connecting host. It seems very stable. For the 
connection timeout, I do set it as 20min but it still report the timeout after 
2 minutes. Could you let me know how do you test locally about the timeout 
setting?

Thanks,
Wenrui
On Tue, Jan 8, 2019 at 7:06 AM Till Rohrmann <trohrm...@apache.org> wrote:
Hi Wenrui,

the exception now occurs while finishing the connection creation. I'm not sure 
whether this is so different. Could it be that your network is overloaded or 
not very reliable? Have you tried running your Flink job outside of AthenaX?

Cheers,
Till
On Tue, Jan 8, 2019 at 2:50 PM Wenrui Meng <wenruim...@gmail.com> wrote:
Hi Till,

Thanks for your reply. Our cluster is Yarn cluster. I found that if we decrease 
the total parallel the timeout issue can be avoided. But we do need that amount 
of taskManagers to process data. In addition, once I increase the netty server 
threads to 128, the error is changed to to following error. It seems the cause 
is different. Could you help take a look?

2b0ac47c1eb1bcbbbe4a97) switched from RUNNING to FAILED.
java.io.IOException: Connecting the channel failed: Connecting to remote task 
manager + 'athena464-sjc1/10.70.129.13:39466' has failed. This might indicate 
that the remote task manager has been lost.
        at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
        at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132)
        at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84)
        at 
org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
        at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:156)
        at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:480)
        at 
org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.requestPartitions(UnionInputGate.java:134)
        at 
org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:148)
        at 
org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:93)
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:748)
Caused by: 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Connecting to remote task manager + 'athena464-sjc1/10.70.129.13:39466' has 
failed. This might indicate that the remote task manager has been lost.
        at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
        at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:132)
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        ... 1 common frames omitted
Caused by: java.net.ConnectException: Connection timed out: 
athena464-sjc1/10.70.129.13:39466
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
        at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:281)
        ... 6 common frames omitted


Thanks,
Wenrui
On Mon, Jan 7, 2019 at 2:38 AM Till Rohrmann <trohrm...@apache.org> wrote:
Hi Wenrui,

the code to set the connect timeout looks ok to me [1]. I also tested it 
locally and checked that the timeout is correctly registered in Netty's 
AbstractNioChannel [2].

Increasing the number of threads to 128 should not be necessary. But it could 
indicate that there is some long lasting or blocking operation being executed 
by the threads.

How does the job submission and cluster configuration work with AthenaX? Will 
the platform spawn for each job a new Flink cluster for which you can specify 
the cluster configuration?

[1] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java#L102
[2] 
https://github.com/netty/netty/blob/netty-4.0.27.Final/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java#L207

Cheers,
Till
On Sat, Jan 5, 2019 at 2:22 AM Wenrui Meng <wenruim...@gmail.com> wrote:
Hi Till,

Thanks for your reply and help on this issue.

I increased taskmanager.network.netty.client.connectTimeoutSec to 1200 which is 
20 minutes. But it seems the connection not respects this timeout. 
In addition, I increase both taskmanager.network.request-backoff.max and 
taskmanager.registration.max-backoff to 20min.

One thing I found is helpful to some extent is increasing the 
taskmanager.network.netty.server.numThreads. I increase it to 128 threads, it 
can succeed sometimes. But keep increasing it doesn't solve the problem. We 
have 100 parallel intermediate results, so there are too many partition 
requests. I think that's why it timeout. The solution should let the connection 
timeout increase. But I think there is some issue that connection doesn't 
respect the timeout config. 

We will definitely try the latest flink version. But at Uber, there is a team 
who is responsible to provide a platform with Flink. They will upgrade it at 
the end of this Month. Meanwhile, I would like to ask some help to investigate 
how to increase the connection timeout and make it respected. 

Thanks,
Wenrui
On Fri, Jan 4, 2019 at 5:27 AM Till Rohrmann <trohrm...@apache.org> wrote:
Hi Wenrui,

from the logs I cannot spot anything suspicious. Which configuration parameters 
have you changed exactly? Does the JobManager log contain anything suspicious?

The current Flink version changed quite a bit wrt 1.4. Thus, it might be worth 
a try to run the job with the latest Flink version.

Cheers,
Till
On Thu, Jan 3, 2019 at 3:00 PM Wenrui Meng <wenruim...@gmail.com> wrote:
Hi,

I consistently get connection timeout issue when creating 
partitionRequestClient in flink 1.4. I tried to ping from the connecting host 
to the connected host, but the ping latency is less than 0.1 ms consistently. 
So it's probably not due to the cluster status. I also tried increase max 
backoff, nettowrk timeout and some other setting, it doesn't help. 

I enabled the debug log of flink but not find any suspicious or useful 
information to help me fix the issue. Here is the link of the jobManager and 
taskManager logs. The connecting host is the host which throw the exception. 
The connected host is the host the connecting host try to request partition 
from. 

Since our platform is not up to date yet, the flink version I used in this is 
1.4. But I noticed that there is not much change of these code on the Master 
branch. Any help will be appreciated. 

Here is stack trace of the exception

from RUNNING to FAILED.
java.io.IOException: Connecting the channel failed: Connecting to remote task 
manager + 'athena485-sjc1/10.70.132.8:34185' has failed. This might indicate 
that the remote task manager has been lost.
 at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
 at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132)
 at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84)
 at 
org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
 at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:156)
 at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:480)
 at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:502)
 at 
org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:93)
 at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
 at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
 at java.lang.Thread.run(Thread.java:748)
Caused by: 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Connecting to remote task manager + 'athena485-sjc1/10.70.132.8:34185' has 
failed. This might indicate that the remote task manager has been lost.
 at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
 at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:132)
 at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
 at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
 at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
 at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:214)
 at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
 at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:120)
 at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
 at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
 at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
 ... 1 common frames omitted
Caused by: 
org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException: 
connection timed out: athena485-sjc1/10.70.132.8:34185
 at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:212)
 ... 6 common frames omitted

Thanks,
Wenrui

Reply via email to