Hi Scott,

Some more clarifications:

Doing a stop-with-savepoint will suspend the checkpoint coordinator, meaning 
that no new checkpoints will happen between taking the savepoint and shutting 
down the job. This means you will be save from duplicates if you only use 
savepoints for this.

Regarding committing of the transactions: they might be committed but they 
probably won't be because there is no mechanism that ensures side effects of 
completed checkpoints are effected before shutting down the job after taking 
the savepoint. The transactional sinks work like this: 1) do checkpoint, where 
we prepare the transaction, notify checkpoint coordinator that our checkpoint 
is "complete" 2) wait for message from checkpoint coordinator that all 
checkpoints (from all parallel operators) are complete 3) commit the 
transaction. That last step is currently not guaranteed to happen when stopping 
with a savepoint. However, when restarting a job from a savepoint the source 
will check if there are any open transactions that should have been committed 
(it knows that because they are stored in state) and then commits them.

This works but is a but fragile so it's high on my list of things I want to see 
fixed in Flink 1.7.

Best,
Aljoscha

> On 30. Jul 2018, at 17:34, vino yang <yanghua1...@gmail.com> wrote:
> 
> Hi Scott,
> 
> For EXACTLY_ONCE in sink end with Kafka 0.11+ producer, The answer is YES.
> There is a official documentation you can have a good knowledge of this 
> topic[1].
> 
> [1]: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011>
> 
> Thanks, vino.
> 
> 
> 
> 2018-07-27 22:53 GMT+08:00 Scott Kidder <kidder.sc...@gmail.com 
> <mailto:kidder.sc...@gmail.com>>:
> Thank you, Aljoscha! Are Kafka transactions committed when a running job has 
> been instructed to cancel with a savepoint (e.g. `flink cancel -s xxxx`)? 
> This is my primary use for savepoints. I would expect that when a new job is 
> submitted with the savepoint, as in the case of an application upgrade, Flink 
> withl create a new Kafka transaction and processing will be exactly-once.
> 
> --Scott Kidder
> 
> On Fri, Jul 27, 2018 at 5:09 AM Aljoscha Krettek <aljos...@apache.org 
> <mailto:aljos...@apache.org>> wrote:
> Hi,
> 
> this has been in the back of my head for a while now. I finally created a 
> Jira issue: https://issues.apache.org/jira/browse/FLINK-9983 
> <https://issues.apache.org/jira/browse/FLINK-9983>
> 
> In there, I also outline a better fix that will take a bit longer to 
> implement.
> 
> Best,
> Aljoscha
> 
>> On 26. Jul 2018, at 23:04, Scott Kidder <kidder.sc...@gmail.com 
>> <mailto:kidder.sc...@gmail.com>> wrote:
>> 
>> I recently began using the exactly-once processing semantic with the Kafka 
>> 0.11 producer in Flink 1.4.2. It's been working great!
>> 
>> Are Kafka transactions committed when creating a Flink savepoint? How does 
>> this affect the recovery behavior in Flink if, before the completion of the 
>> next checkpoint, the application is restarted and restores from a checkpoint 
>> taken before the savepoint? It seems like this might lead to the Kafka 
>> producer writing a message multiple times with different committed Kafka 
>> transactions.
>> 
>> --
>> Scott Kidder
> 
> 

Reply via email to