[ 
https://issues.apache.org/jira/browse/FLINK-5701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15857831#comment-15857831
 ] 

ASF GitHub Bot commented on FLINK-5701:
---------------------------------------

Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3278
  
    Thanks for the detailed review @tillrohrmann, I'll follow-up and address 
your comments.
    
    Regarding removing the `setFlushOnCheckpoint`:
    I think it was added at first to provide flexibility for users who know 
what they are doing, and making sure that the producer will be able to work in 
all environments (see comments in #2108).
    
    However, recently I've also gathered opinions (from you and others) about 
the settings over complicating at-least-once guarantees for the producer, and I 
have the feeling we can remove it starting from the next release.
    
    There is FLINK-5728 to enable flushing by default (currently the default is 
no flushing). I'll incorporate your opinion on this to that JIRA, and decide 
there if we only want to disable if by default or remove it completely.



> FlinkKafkaProducer should check asyncException on checkpoints
> -------------------------------------------------------------
>
>                 Key: FLINK-5701
>                 URL: https://issues.apache.org/jira/browse/FLINK-5701
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector, Streaming Connectors
>            Reporter: Tzu-Li (Gordon) Tai
>            Priority: Critical
>
> Reported in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html
> The problem:
> The producer holds a {{pendingRecords}} value that is incremented on each 
> invoke() and decremented on each callback, used to check if the producer 
> needs to sync on pending callbacks on checkpoints.
> On each checkpoint, we should only consider the checkpoint succeeded iff 
> after flushing the {{pendingRecords == 0}} and {{asyncException == null}} 
> (currently, we’re only checking {{pendingRecords}}).
> A quick fix for this is to check and rethrow async exceptions in the 
> {{snapshotState}} method both before and after flushing and 
> {{pendingRecords}} becomes 0.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to