[ 
https://issues.apache.org/jira/browse/FLINK-21133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17288477#comment-17288477
 ] 

Kezhu Wang commented on FLINK-21133:
------------------------------------

[~trohrmann] Great summarization!

About user-case#2, we may need to handle failed {{notifyCheckpointComplete}}, 
otherwise some 2pc context could lost during downtime(eg. kafka transaction 
timeout). A successful "stop-with-savepoint" should commit all side effects to 
visible and keep user from worry about downtime. This also requires "shut down 
the dataflow pipeline with one checkpoint in total". All these apply to 
user-case#3 too.

About user-case#3, currently "stop-with-savepoint --drain" takes snapshot 
before end-of-input, so some end-of-input-flushing works are not taken into 
savepoint. For example, {{AsyncWaitOperator.endInput}} is called after 
savepoint before FLINK-21132. I think what [~sewen] described in "pipeline 
draining" may deserve a more destructive name, say "terminate". To behave as an 
real terminal operation, the savepoint should be taken after 
end-of-input-flushing operation.

About user-case#4, I agree it should be no much difference to user-case#3 as 
long as we don't force to hold finished sources before checkpoint. Before the 
last checkpoint, there could be only partial or even only one task in running 
and waiting/expecting a checkpoint to commit side effects. I think this should 
be part of FLIP-147's goal. If we are targeting one checkpoint/savepoint for 
all tasks in this cases, it is may not that worth as there could be multiple 
sources exhausted at different time. But I think it has value that all tasks' 
states are preserved after last checkpoint/savepoint. If this is the case, I 
think the last checkpoint/savepoint should be same as user-case#3.
{quote}If this were the case and if we could still create a checkpoint after a 
StreamOperator.close has been called, then we could simply send a 
EndOfPartitionEnvent when an operator reaches the end of input or receives the 
terminate call. Next, the StreamTask would only have to wait for a checkpoint 
to succeed before terminating.
{quote}
I would say it is attractive!

Currently, {{StreamOperator.close}} was used to both "flushing buffered data" 
and "cleanup resources", while FLINK-2647 tried to claim *"Distinguish between 
"close" (flushing buffered data) and "dispose" (cleanup resources) in streaming 
operators"*.

Here is what I have seen:
 # FLINK-16383 ignores {{notifyCheckpointComplete}} after operator closed.
 # I thought {{notifyCheckpointAborted}} should not be invoked after 
{{StreamOperator.close}} before(FLINK-20389).
 # FLINK-20781 {{SourceOperator}} has to deal with {{notifyCheckpointAborted}} 
after {{StreamOperator.close}} due to resources cleaned.

There should be more I am not aware of and involved in.
 * Not "allow records to be sent after the final {{notifyCheckpointComplete}} 
has been executed"
 * "we could still create a checkpoint after a StreamOperator.close has been 
called"

Sadly, both will break something.

> FLIP-27 Source does not work with synchronous savepoint
> -------------------------------------------------------
>
>                 Key: FLINK-21133
>                 URL: https://issues.apache.org/jira/browse/FLINK-21133
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core, API / DataStream, Runtime / Checkpointing
>    Affects Versions: 1.11.3, 1.12.1
>            Reporter: Kezhu Wang
>            Priority: Critical
>             Fix For: 1.11.4, 1.13.0, 1.12.3
>
>
> I have pushed branch 
> [synchronous-savepoint-conflict-with-bounded-end-input-case|https://github.com/kezhuw/flink/commits/synchronous-savepoint-conflict-with-bounded-end-input-case]
>  in my repository. {{SavepointITCase.testStopSavepointWithFlip27Source}} 
> failed due to timeout.
> See also FLINK-21132 and 
> [apache/iceberg#2033|https://github.com/apache/iceberg/issues/2033]..



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to