[ 
https://issues.apache.org/jira/browse/FLINK-4015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15318949#comment-15318949
 ] 

Sebastian Klemke commented on FLINK-4015:
-----------------------------------------

For our application, it should never drop records. Instead, it should be 
determined which record failed and this record and the following records 
produced to that partition should be repeated in order. This would require 
keeping a list of sent but not acknowledged records per partition in the 
producer. But for other applications, dropping might be suitable.

> FlinkKafkaProducer08 fails when partition leader changes
> --------------------------------------------------------
>
>                 Key: FLINK-4015
>                 URL: https://issues.apache.org/jira/browse/FLINK-4015
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.0.2
>            Reporter: Sebastian Klemke
>
> When leader for a partition changes, producer fails with the following 
> exception:
> {code}
> 06:34:50,813 INFO  org.apache.flink.yarn.YarnJobManager                       
>    - Status of job b323f5de3d32504651e861d5ecb27e7c (JOB_NAME) changed to 
> FAILING.
> java.lang.RuntimeException: Could not forward element to next operator
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>       at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>       at OPERATOR.flatMap2(OPERATOR.java:82)
>       at OPERATOR.flatMap2(OPERATOR.java:16)
>       at 
> org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:63)
>       at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:207)
>       at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>       at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>       ... 10 more
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>       at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>       ... 13 more
> Caused by: java.lang.Exception: Failed to send data to Kafka: This server is 
> not the leader for that topic-partition.
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:249)
>       at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>       ... 16 more
> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: 
> This server is not the leader for that topic-partition.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to