[ 
https://issues.apache.org/jira/browse/FLINK-21133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17283631#comment-17283631
 ] 

Stephan Ewen edited comment on FLINK-21133 at 2/16/21, 9:50 AM:
----------------------------------------------------------------

I see, the remaining problem is that exiting the mailbox loop in case of the 
source ending should call {{finishInput()}}, while exiting from {{stop()}} 
should not.

Digging through the code is that there seems to be a lot of confusion and mixup 
between cancelling, stopping, reaching end of the stream, reaching the end of 
the input. For example, the fact that we need to pass a flag 
{{"isStoppingBySyncSavepoint"}} to the {{"close()"}} method points to a bad 
abstraction there.

I think it makes sense to revisit this and clean it up. Similar like [~kezhuw] 
suggested, having an explicit {{EndOfData}} event that would trigger 
{{finishInput()}} woudl make this cleaner.

In that case
  - stop() would simply exit the mailbox loop, task closes/cleans up, and it 
send EndOfPartition downstream. The downstream task, once all input channels 
have seen the EndOfPartition, exits the mailbox in the same way
  - reaching the end of a source, or using "stop()" with the additional flag to 
drain the pipeline, would call "finishInput()" and enqueue an EndOfData event 
before exiting the loop. Downstream tasks call "finishInput()" when they saw 
EndOfData from all their input channels.


was (Author: stephanewen):
I see, the remaining problem is that exiting the mailbox loop in case of the 
source ending should call {{finishInput()}}, while exiting from {{stop()}} 
should not.

Digging through the code is that there seems to be a lot of confusion and mixup 
between cancelling, stopping, reaching end of the stream, reaching the end of 
the input. For example, the fact that we need to pass a flag 
{{"isStoppingBySyncSavepoint"}} to the {{"close()"}} method points to a bad 
abstraction there.

I think it makes sense to revisit this and clean it up. Similar like [~kezhuw] 
suggested, having an explicit {{EndOfData}} event that would trigger 
{{finishInput()}} woudl make this cleaner.

In that case
  - stop() would simply exit the mailbox loop, task closes/cleans up, and it 
send EndOfPartition downstream, which also ends exits the mailbox in the same 
way
  - reaching the end of a source, or using "stop()" with the additional flag to 
drain the pipeline would call "finishInput()" and enqueue an EndOfData event 
before exiting the loop, so that downstream tasks also fall "finishInput()".

> FLIP-27 Source does not work with synchronous savepoint
> -------------------------------------------------------
>
>                 Key: FLINK-21133
>                 URL: https://issues.apache.org/jira/browse/FLINK-21133
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core, API / DataStream, Runtime / Checkpointing
>    Affects Versions: 1.11.3, 1.12.1
>            Reporter: Kezhu Wang
>            Priority: Critical
>             Fix For: 1.11.4, 1.13.0, 1.12.3
>
>
> I have pushed branch 
> [synchronous-savepoint-conflict-with-bounded-end-input-case|https://github.com/kezhuw/flink/commits/synchronous-savepoint-conflict-with-bounded-end-input-case]
>  in my repository. {{SavepointITCase.testStopSavepointWithFlip27Source}} 
> failed due to timeout.
> See also FLINK-21132 and 
> [apache/iceberg#2033|https://github.com/apache/iceberg/issues/2033]..



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to