The reorder issue can be resolved by setting
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION to 1 if we talking pure kafka
producer configs ( and I believe they port over to flink kafka connecter ).
This does limit the concurrency ( at the TCP level )  when kafka is back
up  an issue which is not very limiting once we have understood  the
batch,size and linger.ms configurations and set them up optimally, of kafka
producer.

On Thu, Jan 25, 2018 at 11:41 AM, Elias Levy <fearsome.lucid...@gmail.com>
wrote:

> Try setting the Kafka producer config option for number of retries
> ("retries") to a large number, to avoid the timeout.  It defaults to zero.
> Do note that retries may result reordered records.
>
> On Wed, Jan 24, 2018 at 7:07 PM, Ashish Pokharel <ashish...@yahoo.com>
> wrote:
>
>> Fabian,
>>
>> Thanks for your feedback - very helpful as usual !
>>
>> This is sort of becoming a huge problem for us right now because of our
>> Kafka situation. For some reason I missed this detail going through the
>> docs. We are definitely seeing heavy dose of data loss when Kafka timeouts
>> are happening.
>>
>> We actually have 1.4 version - I’d be interested to understand if
>> anything can be done in 1.4 to prevent this scenario.
>>
>> One other thought I had was an ability to invoke “Checkpointing before
>> Restart / Recovery” -> meaning I don’t necessarily need to checkpoint
>> periodically but I do want to make sure on a explicit restart /
>> rescheduling like this, we do have a decent “last known” state. Not sure if
>> this is currently doable.
>>
>> Thanks, Ashish
>>
>> On Jan 23, 2018, at 5:03 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>> Hi Ashish,
>>
>> Originally, Flink always performed full recovery in case of a failure,
>> i.e., it restarted the complete application.
>> There is some ongoing work to improve this and make recovery more
>> fine-grained (FLIP-1 [1]).
>> Some parts have been added for 1.3.0.
>>
>> I'm not familiar with the details, but Stefan (in CC) should be able to
>> answer your specific question.
>>
>> Best, Fabian
>>
>> [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%
>> 3A+Fine+Grained+Recovery+from+Task+Failures
>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+:+Fine+Grained+Recovery+from+Task+Failures>
>>
>> 2018-01-19 20:59 GMT+01:00 ashish pok <ashish...@yahoo.com>:
>>
>>> Team,
>>>
>>> One more question to the community regarding hardening Flink Apps.
>>>
>>> Let me start off by saying we do have known Kafka bottlenecks which we
>>> are in the midst of resolving. So during certain times of day, a lot of our
>>> Flink Apps are seeing Kafka Producer timeout issues. Most of the logs are
>>> some flavor of this:
>>>
>>> java.lang.Exception: Failed to send data to Kafka: Expiring 28 record(s)
>>> for dev.-.-.-.-.trans-2: 5651 ms has passed since batch creation plus
>>> linger time
>>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProduc
>>> erBase.checkErroneous(FlinkKafkaProducerBase.java:373)
>>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProduc
>>> er010.invokeInternal(FlinkKafkaProducer010.java:302)
>>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProduc
>>> er010.processElement(FlinkKafkaProducer010.java:421)
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>> ngChainingOutput.pushToOperator(OperatorChain.java:549)
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>> ngChainingOutput.collect(OperatorChain.java:524)
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>> ngChainingOutput.collect(OperatorChain.java:504)
>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>> tor$CountingOutput.collect(AbstractStreamOperator.java:831)
>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>> tor$CountingOutput.collect(AbstractStreamOperator.java:809)
>>> at org.apache.flink.streaming.api.operators.StreamMap.processEl
>>> ement(StreamMap.java:41)
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>> ngChainingOutput.pushToOperator(OperatorChain.java:549)
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>> ngChainingOutput.collect(OperatorChain.java:524)
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
>>> ngChainingOutput.collect(OperatorChain.java:504)
>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>> tor$CountingOutput.collect(AbstractStreamOperator.java:831)
>>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>>> tor$CountingOutput.collect(AbstractStreamOperator.java:809)
>>> at org.apache.flink.streaming.api.operators.StreamMap.processEl
>>> ement(StreamMap.java:41)
>>> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.p
>>> rocessInput(StreamInputProcessor.java:207)
>>> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.
>>> run(OneInputStreamTask.java:69)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(S
>>> treamTask.java:264)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 28
>>> record(s) for dev.-.-.-.-.trans-2: 5651 ms has passed since batch creation
>>> plus linger time
>>>
>>> Timeouts are not necessarily good but I am sure we understand this is
>>> bound to happen (hopefully lesser).
>>>
>>> The issue for us however is it almost looks like Flink is stopping and
>>> restarting all operators (a lot of other operators including Map, Reduce
>>> and Process functions if not all) along with Kafka Producers. We are
>>> processing pretty substantial load in Flink and dont really intend to
>>> enable Rocks/HDFS checkpointing in some of these Apps - we are ok to
>>> sustain some data loss when App crashes completely or something along those
>>> lines. However, what we are noticing here is all the data that are in
>>> memory for sliding window functions are also lost completely because of
>>> this. I would have thought because of the retry settings in Kafka Producer,
>>> even those 28 events in queue should have been recovered let alone over a
>>> million events in Memory State waiting to be Folded/Reduced for the sliding
>>> window. This doesnt feel right :)
>>>
>>> Is only way to solve this is by creating Rocks/HDFS checkpoint? Why
>>> would almost all Job Graph restart on an operator timeout? Do I need to do
>>> something simple like disable Operator chaining? We really really are
>>> trying to just use Memory and not any other state for these heavy hitting
>>> streams.
>>>
>>> Thanks for your help,
>>>
>>> Ashish
>>>
>>
>>
>>
>

Reply via email to