[ 
https://issues.apache.org/jira/browse/FLINK-21133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17285367#comment-17285367
 ] 

Stephan Ewen commented on FLINK-21133:
--------------------------------------

The complexity of adding a new {{EndOfData}} event, versus a flag to the the 
{{EndOfPartition}} event seem to be pretty similar, because the complexity lies 
in the book-keeping and how to handle the information.

What I really like is the idea to change the definitions slightly so that we do 
not have to distinguish these cases at all, and that {{EndOfPartition}} and 
{{EndOfData}} is always the same, and we need no extra frags.


h3. Stop-with-Savepoint, no draining

This simple variant (the _"suspend case"_) could just become a "cancel with 
savepoint", as [~kezhuw] mentioned.
There is no need to cleanly shut down streams. After the 
"{{notifyCheckpointComplete()}}" call, the action just throws a 
{{CancelTaskException}} and the pipeline shuts down.

That means no {{EndOfPartition}} would be involved, and "{{close()}}" and 
"{{finishInput()}}" would not be called. So we could get rid of all the special 
cases.

Maybe this needs some JM side work, meaning supporting that the scheduler goes 
into "cancelling" state without actually triggering the "cancel()" on the 
tasks, yet. And that it goes into global recovery if a task fails in that state.
That would also be a clean way to handle 
https://issues.apache.org/jira/browse/FLINK-21030 (issue around "stop with 
savepoint" and regional recovery ([~trohrmann])

h3. h3. Stop-with-Savepoint, with pipeline draining

This looks like the same thing as the feature that shuts down a bounded stream 
with a checkpoint.
The only difference is that mailbox on the sources exits its default action 
(reading from the sources) due to the "stop()" RPC call, rather than because of 
the "EndOfInput" status. Here everything shuts down with {{EndOfPartition}} 
which is also simultaneously {{EndOfData}}. The {{finishInput()}} and 
{{close()}} methods actually get called.

To support specifying a savepoint for that shutdown, we would need to be able 
to shut down the dataflow pipeline with one checkpoint in total. That would be 
interesting input for the 
[FLIP-147|https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished]
 discussion.


> FLIP-27 Source does not work with synchronous savepoint
> -------------------------------------------------------
>
>                 Key: FLINK-21133
>                 URL: https://issues.apache.org/jira/browse/FLINK-21133
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core, API / DataStream, Runtime / Checkpointing
>    Affects Versions: 1.11.3, 1.12.1
>            Reporter: Kezhu Wang
>            Priority: Critical
>             Fix For: 1.11.4, 1.13.0, 1.12.3
>
>
> I have pushed branch 
> [synchronous-savepoint-conflict-with-bounded-end-input-case|https://github.com/kezhuw/flink/commits/synchronous-savepoint-conflict-with-bounded-end-input-case]
>  in my repository. {{SavepointITCase.testStopSavepointWithFlip27Source}} 
> failed due to timeout.
> See also FLINK-21132 and 
> [apache/iceberg#2033|https://github.com/apache/iceberg/issues/2033]..



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to