[ https://issues.apache.org/jira/browse/FLINK-21133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17288477#comment-17288477 ]
Kezhu Wang commented on FLINK-21133: ------------------------------------ [~trohrmann] Great summarization! About user-case#2, we may need to handle failed {{notifyCheckpointComplete}}, otherwise some 2pc context could lost during downtime(eg. kafka transaction timeout). A successful "stop-with-savepoint" should commit all side effects to visible and keep user from worry about downtime. This also requires "shut down the dataflow pipeline with one checkpoint in total". All these apply to user-case#3 too. About user-case#3, currently "stop-with-savepoint --drain" takes snapshot before end-of-input, so some end-of-input-flushing works are not taken into savepoint. For example, {{AsyncWaitOperator.endInput}} is called after savepoint before FLINK-21132. I think what [~sewen] described in "pipeline draining" may deserve a more destructive name, say "terminate". To behave as an real terminal operation, the savepoint should be taken after end-of-input-flushing operation. About user-case#4, I agree it should be no much difference to user-case#3 as long as we don't force to hold finished sources before checkpoint. Before the last checkpoint, there could be only partial or even only one task in running and waiting/expecting a checkpoint to commit side effects. I think this should be part of FLIP-147's goal. If we are targeting one checkpoint/savepoint for all tasks in this cases, it is may not that worth as there could be multiple sources exhausted at different time. But I think it has value that all tasks' states are preserved after last checkpoint/savepoint. If this is the case, I think the last checkpoint/savepoint should be same as user-case#3. {quote}If this were the case and if we could still create a checkpoint after a StreamOperator.close has been called, then we could simply send a EndOfPartitionEnvent when an operator reaches the end of input or receives the terminate call. Next, the StreamTask would only have to wait for a checkpoint to succeed before terminating. {quote} I would say it is attractive! Currently, {{StreamOperator.close}} was used to both "flushing buffered data" and "cleanup resources", while FLINK-2647 tried to claim *"Distinguish between "close" (flushing buffered data) and "dispose" (cleanup resources) in streaming operators"*. Here is what I have seen: # FLINK-16383 ignores {{notifyCheckpointComplete}} after operator closed. # I thought {{notifyCheckpointAborted}} should not be invoked after {{StreamOperator.close}} before(FLINK-20389). # FLINK-20781 {{SourceOperator}} has to deal with {{notifyCheckpointAborted}} after {{StreamOperator.close}} due to resources cleaned. There should be more I am not aware of and involved in. * Not "allow records to be sent after the final {{notifyCheckpointComplete}} has been executed" * "we could still create a checkpoint after a StreamOperator.close has been called" Sadly, both will break something. > FLIP-27 Source does not work with synchronous savepoint > ------------------------------------------------------- > > Key: FLINK-21133 > URL: https://issues.apache.org/jira/browse/FLINK-21133 > Project: Flink > Issue Type: Bug > Components: API / Core, API / DataStream, Runtime / Checkpointing > Affects Versions: 1.11.3, 1.12.1 > Reporter: Kezhu Wang > Priority: Critical > Fix For: 1.11.4, 1.13.0, 1.12.3 > > > I have pushed branch > [synchronous-savepoint-conflict-with-bounded-end-input-case|https://github.com/kezhuw/flink/commits/synchronous-savepoint-conflict-with-bounded-end-input-case] > in my repository. {{SavepointITCase.testStopSavepointWithFlip27Source}} > failed due to timeout. > See also FLINK-21132 and > [apache/iceberg#2033|https://github.com/apache/iceberg/issues/2033].. -- This message was sent by Atlassian Jira (v8.3.4#803005)