[ 
https://issues.apache.org/jira/browse/FLINK-18706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17169377#comment-17169377
 ] 

Kostas Kloudas commented on FLINK-18706:
----------------------------------------

[~Yumeng] if you are using an exactly-once sink, then the sink will only send 
data to the outside world upon completion of the last savepoint. So the 
semantics will be exactly-once end-to-end.  But this of course holds only for 
exactly-once sinks.

> Stop with savepoint cannot guarantee exactly-once for kafka source
> ------------------------------------------------------------------
>
>                 Key: FLINK-18706
>                 URL: https://issues.apache.org/jira/browse/FLINK-18706
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka, Runtime / Checkpointing
>    Affects Versions: 1.10.1, 1.11.1
>            Reporter: Yumeng Zhang
>            Priority: Major
>              Labels: pull-request-available
>
> When I run stop-with-savepoint command with my old job and submit a new job 
> with the previous sync-savepoint, I find sometimes my new job will consume a 
> few duplicate data. Here is my case. I have a data generation job with 
> parallelism 1, which will generate long number incrementally and send the 
> data to Kafka topicA which only has one partition. Then I have another 
> consumer job with parallelism 1, which reads data from topicA and does 
> nothing processing, just print these numbers to system out. For example, 
> after doing stop-with-savepoint, my consumer job has printed sequence 
> 0,1,2,3...40,41,42,43. Then I start the consumer job again from that 
> sync-savepoint. It prints 41,42,43,44..., which means it has consumed some 
> duplicate data.
> I think the reason is that we fail to guarantee the mutual exclusion between 
> canceling source task and sending data to downstream by checkpoint lock. It 
> may send some data to downstream first before sync-savepoint completed and 
> then cancel the task. Therefore, We need to keep the source operator running 
> in the synchronous savepoint mailbox loop for triggerCheckpoint method before 
> synchronous savepoint completed and keep checking running state before 
> sending data to downstream for Kafka connector. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to