Hi Ninad,

I think that Gordon could shed some more light on this but I suggest 
you should update your Flink version to at least the 1.2. 

The reason is that we are already in the process of releasing Flink 1.3 
(which will come probably today) and a lot of things have 
changed/fixed/improved since the 1.1 release. In fact, it would help us
a lot if you could check if your problem still exists in the upcoming 1.3 
release.

In addition, I suppose that the 1.1 release will soon be not supported 
anymore.

Cheers,
Kostas

> On Jun 1, 2017, at 12:15 AM, ninad <nni...@gmail.com> wrote:
> 
> Thanks for the fix guys. I am trying to test this with 1.1.5, but still
> seeing a data loss. I am not able to get much from logs except this:
> 
> Here's our use case:
> 
> 1) Consume from Kafka
> 2) Apply session window
> 3) Send messages of window to Kafka
> 
> If there's a failure in step 3, because all kafka brokers are down, we see a
> data loss. 
> 
> Here are relevant logs:
> 
> java.lang.Exception: Could not perform checkpoint 2 for operator
> TriggerWindow(ProcessingTimeSessionWindows(30000),
> ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@61ef6d67},
> ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:257)) ->
> Sink: sink.http.sep (2/4).
>       at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:611)
>       at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:360)
>       at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:272)
>       at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:174)
>       at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:195)
>       at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:67)
>       at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not perform the checkpoint 2 for 0th
> operator in chain.
>       at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:666)
>       at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:603)
>       ... 8 more
> Caused by: java.lang.Exception: Failed to snapshot function state of
> TriggerWindow(ProcessingTimeSessionWindows(30000),
> ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@61ef6d67},
> ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:257)) ->
> Sink: sink.http.sep (2/4).
>       at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(AbstractUdfStreamOperator.java:139)
>       at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:652)
>       ... 9 more
> Caused by: java.lang.Exception: Failed to send data to Kafka: Batch Expired
>       at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:366)
>       at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.snapshotState(FlinkKafkaProducerBase.java:335)
> 
> 
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13412.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.

Reply via email to