Team,
One more question to the community regarding hardening Flink Apps.
Let me start off by saying we do have known Kafka bottlenecks which we are in 
the midst of resolving. So during certain times of day, a lot of our Flink Apps 
are seeing Kafka Producer timeout issues. Most of the logs are some flavor of 
this:
java.lang.Exception: Failed to send data to Kafka: Expiring 28 record(s) for 
dev.-.-.-.-.trans-2: 5651 ms has passed since batch creation plus linger time 
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
 at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invokeInternal(FlinkKafkaProducer010.java:302)
 at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.processElement(FlinkKafkaProducer010.java:421)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
 at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
 at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
 at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
 at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) 
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at 
java.lang.Thread.run(Thread.java:745)Caused by: 
org.apache.kafka.common.errors.TimeoutException: Expiring 28 record(s) for 
dev.-.-.-.-.trans-2: 5651 ms has passed since batch creation plus linger time
Timeouts are not necessarily good but I am sure we understand this is bound to 
happen (hopefully lesser). 
The issue for us however is it almost looks like Flink is stopping and 
restarting all operators (a lot of other operators including Map, Reduce and 
Process functions if not all) along with Kafka Producers. We are processing 
pretty substantial load in Flink and dont really intend to enable Rocks/HDFS 
checkpointing in some of these Apps - we are ok to sustain some data loss when 
App crashes completely or something along those lines. However, what we are 
noticing here is all the data that are in memory for sliding window functions 
are also lost completely because of this. I would have thought because of the 
retry settings in Kafka Producer, even those 28 events in queue should have 
been recovered let alone over a million events in Memory State waiting to be 
Folded/Reduced for the sliding window. This doesnt feel right :) 
Is only way to solve this is by creating Rocks/HDFS checkpoint? Why would 
almost all Job Graph restart on an operator timeout? Do I need to do something 
simple like disable Operator chaining? We really really are trying to just use 
Memory and not any other state for these heavy hitting streams. 
Thanks for your help,
Ashish

Reply via email to