[
https://issues.apache.org/jira/browse/FLINK-21133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17283631#comment-17283631
]
Stephan Ewen edited comment on FLINK-21133 at 2/16/21, 9:50 AM:
----------------------------------------------------------------
I see, the remaining problem is that exiting the mailbox loop in case of the
source ending should call {{finishInput()}}, while exiting from {{stop()}}
should not.
Digging through the code is that there seems to be a lot of confusion and mixup
between cancelling, stopping, reaching end of the stream, reaching the end of
the input. For example, the fact that we need to pass a flag
{{"isStoppingBySyncSavepoint"}} to the {{"close()"}} method points to a bad
abstraction there.
I think it makes sense to revisit this and clean it up. Similar like [~kezhuw]
suggested, having an explicit {{EndOfData}} event that would trigger
{{finishInput()}} woudl make this cleaner.
In that case
- stop() would simply exit the mailbox loop, task closes/cleans up, and it
send EndOfPartition downstream. The downstream task, once all input channels
have seen the EndOfPartition, exits the mailbox in the same way
- reaching the end of a source, or using "stop()" with the additional flag to
drain the pipeline, would call "finishInput()" and enqueue an EndOfData event
before exiting the loop. Downstream tasks call "finishInput()" when they saw
EndOfData from all their input channels.
was (Author: stephanewen):
I see, the remaining problem is that exiting the mailbox loop in case of the
source ending should call {{finishInput()}}, while exiting from {{stop()}}
should not.
Digging through the code is that there seems to be a lot of confusion and mixup
between cancelling, stopping, reaching end of the stream, reaching the end of
the input. For example, the fact that we need to pass a flag
{{"isStoppingBySyncSavepoint"}} to the {{"close()"}} method points to a bad
abstraction there.
I think it makes sense to revisit this and clean it up. Similar like [~kezhuw]
suggested, having an explicit {{EndOfData}} event that would trigger
{{finishInput()}} woudl make this cleaner.
In that case
- stop() would simply exit the mailbox loop, task closes/cleans up, and it
send EndOfPartition downstream, which also ends exits the mailbox in the same
way
- reaching the end of a source, or using "stop()" with the additional flag to
drain the pipeline would call "finishInput()" and enqueue an EndOfData event
before exiting the loop, so that downstream tasks also fall "finishInput()".
> FLIP-27 Source does not work with synchronous savepoint
> -------------------------------------------------------
>
> Key: FLINK-21133
> URL: https://issues.apache.org/jira/browse/FLINK-21133
> Project: Flink
> Issue Type: Bug
> Components: API / Core, API / DataStream, Runtime / Checkpointing
> Affects Versions: 1.11.3, 1.12.1
> Reporter: Kezhu Wang
> Priority: Critical
> Fix For: 1.11.4, 1.13.0, 1.12.3
>
>
> I have pushed branch
> [synchronous-savepoint-conflict-with-bounded-end-input-case|https://github.com/kezhuw/flink/commits/synchronous-savepoint-conflict-with-bounded-end-input-case]
> in my repository. {{SavepointITCase.testStopSavepointWithFlip27Source}}
> failed due to timeout.
> See also FLINK-21132 and
> [apache/iceberg#2033|https://github.com/apache/iceberg/issues/2033]..
--
This message was sent by Atlassian Jira
(v8.3.4#803005)