I just stumbled on this same problem without any associated ZK issues. We had a 
Kafka broker fail that caused this issue:

2018-07-18 02:48:13,497 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Produce: 
<output_topic_name> (2/4) (7e7d61b286d90c51bbd20a15796633f2) switched from 
RUNNING to FAILED.
java.lang.Exception: Failed to send data to Kafka: The server disconnected 
before a response was received.
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke(FlinkKafkaProducer010.java:288)
        at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.NetworkException: The server 
disconnected before a response was received.

This is the kind of error we should be robust to - the Kafka cluster will 
(reasonably quickly) recover and give a new broker for a particular partition 
(in this case, partition #2). Maybe retries should be the default 
configuration? I believe the client uses the Kafka defaults (acks=0, 
retries=0), but we typically run with acks=1 (or all) and retries=MAX_INT. Do I 
need to do anything more than that to get a more robust producer?

Ron

> On May 16, 2018, at 7:45 PM, Tony Wei <tony19920...@gmail.com> wrote:
> 
> Hi Ufuk, Piotr
> 
> Thanks for all of your replies. I knew that jobs are cancelled if the JM 
> looses the connection to ZK, but JM didn't loose connection in my case.
> My job failed because of the exception from KafkaProducer. However, it 
> happened before and after that exception that TM lost ZK connection.
> So, as Piotr said, it looks like an error in Kafka producer and I will pay 
> more attention on it to see if there is something unexpected happens again.
> 
> Best Regards,
> Tony Wei
> 
> 2018-05-15 19:56 GMT+08:00 Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>>:
> Hi,
> 
> It looks like there was an error in asynchronous job of sending the records 
> to Kafka. Probably this is a collateral damage of loosing connection to 
> zookeeper. 
> 
> Piotrek
> 
>> On 15 May 2018, at 13:33, Ufuk Celebi <u...@apache.org 
>> <mailto:u...@apache.org>> wrote:
>> 
>> Hey Tony,
>> 
>> thanks for the detailed report.
>> 
>> - In Flink 1.4, jobs are cancelled if the JM looses the connection to ZK and 
>> recovered when the connection is re-established (and one JM becomes leader 
>> again).
>> 
>> - Regarding the KafkaProducer: I'm not sure from the log message whether 
>> Flink closes the KafkaProducer because the job is cancelled or because there 
>> is a connectivity issue to the Kafka cluster. Including Piotr (cc) in this 
>> thread who has worked on the KafkaProducer in the past. If it is a 
>> connectivity issue, it might also explain why you lost the connection to ZK.
>> 
>> Glad to hear that everything is back to normal. Keep us updated if something 
>> unexpected happens again.
>> 
>> – Ufuk
>> 
>> 
>> On Tue, May 15, 2018 at 6:28 AM, Tony Wei <tony19920...@gmail.com 
>> <mailto:tony19920...@gmail.com>> wrote:
>> Hi all,
>> 
>> I restarted the cluster and changed the log level to DEBUG, and raised the 
>> parallelism of my streaming job from 32 to 40.
>> However, the problem just disappeared and I don't know why.
>> I will remain these settings for a while. If the error happen again, I will 
>> bring more informations back for help. Thank you.
>> 
>> Best Regards,
>> Tony Wei
>> 
>> 2018-05-14 14:24 GMT+08:00 Tony Wei <tony19920...@gmail.com 
>> <mailto:tony19920...@gmail.com>>:
>> Hi all,
>> 
>> After I changed the `high-availability.zookeeper.client.session-timeout` and 
>> `maxSessionTimeout` to 120000ms, the exception still occurred.
>> 
>> Here is the log snippet. It seems this is nothing to do with zookeeper 
>> client timeout, but I still don't know why kafka producer would be closed 
>> without any task state changed.
>> 
>> ```
>> 2018-05-14 05:18:53,468 WARN  
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Client 
>> session timed out, have not heard from server in 82828ms for sessionid 
>> 0x305f957eb8d000a
>> 2018-05-14 05:18:53,468 INFO  
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Client 
>> session timed out, have not heard from server in 82828ms for sessionid 
>> 0x305f957eb8d000a, closing socket connection and attempting reconnect
>> 2018-05-14 05:18:53,571 INFO  
>> org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
>>   - State change: SUSPENDED
>> 2018-05-14 05:18:53,574 WARN  
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
>> Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
>> ZooKeeper.
>> 2018-05-14 05:18:53,850 WARN  
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - SASL 
>> configuration failed: javax.security.auth.login.LoginException: No JAAS 
>> configuration section named 'Client' was found in specified JAAS 
>> configuration file: '/mnt/jaas-466390940757021791.conf'. Will continue 
>> connection to Zookeeper server without SASL authentication, if Zookeeper 
>> server allows it.
>> 2018-05-14 05:18:53,850 INFO  
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening 
>> socket connection to server XXX.XXX.XXX.XXX:2181
>> 2018-05-14 05:18:53,852 ERROR 
>> org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  - 
>> Authentication failed
>> 2018-05-14 05:18:53,853 INFO  
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket 
>> connection established to XXX.XXX.XXX.XXX:2181, initiating session
>> 2018-05-14 05:18:53,859 INFO  
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session 
>> establishment complete on server XXX.XXX.XXX.XXX:2181, sessionid = 
>> 0x305f957eb8d000a, negotiated timeout = 120000
>> 2018-05-14 05:18:53,860 INFO  
>> org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
>>   - State change: RECONNECTED
>> 2018-05-14 05:18:53,860 INFO  
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
>> Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.
>> 2018-05-14 05:28:54,781 INFO  
>> org.apache.kafka.clients.producer.KafkaProducer               - Closing the 
>> Kafka producer with timeoutMillis = 9223372036854775807 ms.
>> 2018-05-14 05:28:54,829 INFO  
>> org.apache.kafka.clients.producer.KafkaProducer               - Closing the 
>> Kafka producer with timeoutMillis = 9223372036854775807 ms.
>> 2018-05-14 05:28:54,918 INFO  org.apache.flink.runtime.taskmanager.Task      
>>                - match-rule -> (get-ordinary -> Sink: kafka-sink, get-cd -> 
>> Sink: kafka-sink-cd) (1/32) (e3462ff8bb565bb0cf4de49ffc2595fb) switched from 
>> RUNNING to FAILED.
>> java.lang.Exception: Failed to send data to Kafka: The server disconnected 
>> before a response was received.
>>      at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
>>      at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke(FlinkKafkaProducer010.java:288)
>>      at 
>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>      at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:464)
>>      at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:441)
>>      at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:415)
>>      at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
>>      at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
>>      at 
>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>>      at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:464)
>>      at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:441)
>>      at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:415)
>>      at 
>> org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.collect(CopyingDirectedOutput.java:62)
>>      at 
>> org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.collect(CopyingDirectedOutput.java:34)
>>      at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
>>      at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
>>      at 
>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>      at 
>> com.appier.rt.rt_match.flink.operator.MatchRuleOperator$$anonfun$flatMap1$4.apply(MatchRuleOperator.scala:39)
>>      at 
>> com.appier.rt.rt_match.flink.operator.MatchRuleOperator$$anonfun$flatMap1$4.apply(MatchRuleOperator.scala:38)
>>      at 
>> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
>>      at 
>> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
>>      at 
>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>>      at scala.collection.immutable.Map$Map2.foreach(Map.scala:137)
>>      at 
>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>>      at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
>>      at 
>> com.appier.rt.rt_match.flink.operator.MatchRuleOperator.flatMap1(MatchRuleOperator.scala:38)
>>      at 
>> com.appier.rt.rt_match.flink.operator.MatchRuleOperator.flatMap1(MatchRuleOperator.scala:14)
>>      at org.apache.flink.streaming.api.operators.co 
>> <http://api.operators.co/>.CoStreamFlatMap.processElement1(CoStreamFlatMap.java:53)
>>      at org.apache.flink.streaming.runtime.io 
>> <http://runtime.io/>.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:243)
>>      at 
>> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
>>      at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>>      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>      at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.kafka.common.errors.NetworkException: The server 
>> disconnected before a response was received.
>> ```
>> 
>> Best Regards,
>> Tony Wei
>> 
>> 2018-05-14 11:36 GMT+08:00 Tony Wei <tony19920...@gmail.com 
>> <mailto:tony19920...@gmail.com>>:
>> Hi all,
>> 
>> Recently, my flink job met a problem that caused the job failed and 
>> restarted.
>> 
>> The log is list this screen snapshot
>> 
>> <exception.png>
>> 
>> or this
>> 
>> ```
>> 2018-05-11 13:21:04,582 WARN  
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Client 
>> session timed out, have not heard from server in 61054ms for sessionid 
>> 0x3054b165fe2006a
>> 2018-05-11 13:21:04,583 INFO  
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Client 
>> session timed out, have not heard from server in 61054ms for sessionid 
>> 0x3054b165fe2006a, closing socket connection and attempting reconnect
>> 2018-05-11 13:21:04,683 INFO  
>> org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
>>   - State change: SUSPENDED
>> 2018-05-11 13:21:04,686 WARN  
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
>> Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
>> ZooKeeper.
>> 2018-05-11 13:21:04,689 INFO  
>> org.apache.kafka.clients.producer.KafkaProducer               - Closing the 
>> Kafka producer with timeoutMillis = 9223372036854775807 ms.
>> 2018-05-11 13:21:04,694 INFO  
>> org.apache.kafka.clients.producer.KafkaProducer               - Closing the 
>> Kafka producer with timeoutMillis = 9223372036854775807 ms.
>> 2018-05-11 13:21:04,698 INFO  org.apache.flink.runtime.taskmanager.Task      
>>                - match-rule -> (get-ordinary -> Sink: kafka-sink, get-cd -> 
>> Sink: kafka-sink-cd) (4/32) (65a4044ac963e083f2635fe24e7f2403) switched from 
>> RUNNING to FAILED.
>> java.lang.Exception: Failed to send data to Kafka: The server disconnected 
>> before a response was received.
>> ```
>> 
>> Logs showed `org.apache.kafka.clients.producer.KafkaProducer - Closing the 
>> Kafka producer with timeoutMillis = 9223372036854775807 ms.` This timeout 
>> value is Long.MAX_VALUE. It happened when someone called `producer.close()`.
>> 
>> And I also saw the log said 
>> `org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Client 
>> session timed out, have not heard from server in 61054ms for sessionid 
>> 0x3054b165fe2006a, closing socket connection and attempting reconnect`
>> and 
>> `org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
>> Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
>> ZooKeeper.`
>> 
>> I have checked zookeeper and kafka and there was no error during that period.
>> I was wondering if TM will stop the tasks when it lost zookeeper client in 
>> HA mode. Since I didn't see any document or mailing thread discuss this, I'm 
>> not sure if this is the reason that made kafka producer closed.
>> Could someone who know HA well? Or someone know what happened in my job?
>> 
>> My flink cluster version is 1.4.0 with 2 masters and 10 slaves. My zookeeper 
>> cluster version is 3.4.11 with 3 nodes.
>> The `high-availability.zookeeper.client.session-timeout` is default value: 
>> 60000 ms.
>> The `maxSessionTimeout` in zoo.cfg is 40000ms.
>> I have already change the maxSessionTimeout to 120000ms this morning.
>> 
>> This problem happened many many times during the last weekend and made my 
>> kafka log delay grew up. Please help me. Thank you very much!
>> 
>> Best Regards,
>> Tony Wei
>> 
>> 
>> 
>> 
>> 
>> 
> 
> 

Reply via email to