Dawid Wysakowicz created FLINK-23473:
----------------------------------------

             Summary: Do not create transaction in TwoPhaseCommitSinkFunction 
after finish()
                 Key: FLINK-23473
                 URL: https://issues.apache.org/jira/browse/FLINK-23473
             Project: Flink
          Issue Type: Sub-task
          Components: Connectors / Kafka, Runtime / Checkpointing
            Reporter: Dawid Wysakowicz


In a scenario where:
1. task/operator received `finish()`
2. checkpoint 42 triggered (not yet completed)
3. checkpoint 43 triggered (not yet completed)
4. checkpoint 44 triggered (not yet completed)
5. notifyCheckpointComplete(43)

And what should we do now? We can of course commit all transactions until
checkpoint 43. But should we keep waiting for `notyifyCheckpointComplete(44)`? 
What if in the meantime another checkpoint is triggered? We could end up 
waiting indefinitely.

Our proposal is to shutdown the task immediately after seeing first
`notifyCheckpointComplete(X)`, where X is any triggered checkpoint AFTER
`finish()`. This should be fine, as:
a) ideally there should be no new pending transactions opened after
checkpoint 42
b) even if operator/function is opening some transactions for checkpoint 43
and checkpoint 44 (`FlinkKafkaProducer`), those transactions after
checkpoint 42 should be empty

After seeing 5. (notifyCheckpointComplete(43)) It should be good enough to:
- commit transactions from checkpoint 42, (and 43 if they were created,
depends on the user code)
- close operator, aborting any pending transactions (for checkpoint 44 if
they were opened, depends on the user code)

If checkpoint 44 completes afterwards, it will still be valid. Ideally we
would recommend that after seeing `finish()` operators/functions should not
be opening any new transactions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to