[ 
https://issues.apache.org/jira/browse/FLINK-21133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17282226#comment-17282226
 ] 

Jiangjie Qin commented on FLINK-21133:
--------------------------------------

[~mapohl] [~kezhuw], thanks for digging into the issue. The root cause you 
found sounds correct to me. A quick fix would be implementing {{finishTask()}} 
in the {{SourceOperatorStreamTask}}. 

That said, the semantic of {{finishTask()}} seems not well defined. It would be 
good if we can first agree on the behavior before we fix the issue. The current 
semantic of {{finishTask()}} is following.
 * For non-source tasks, this method is essentially optional and can be 
regarded as a "hook". And in fact it is even not guaranteed to be invoked if I 
understand correctly - because the checkpoint callback may come after the task 
has received the {{EndOfInput}} marker from upstream tasks. At that point, the 
task might have exited.
 * For source tasks, this method must be implemented in order to let the task 
stop when synchronous savepoint is taken. Supposedly a source task should 
return an {{EndOfInput}} marker from the source to the downstream so all the 
tasks will exit. I am not sure if this {{EndOfInput}} marker should be 
different from an actual {{EndOfInput}} marker as an operator may want to 
behave differently if the stream has not actually ended. But this is a separate 
issue.

Assuming we keep the above semantic unchanged for now. For the quick fix, we 
may need to do the following:
 # Add a {{stop()}} method to the SourceOperator. Once it is called, the 
SourceOperator#emitNext() method will just return {{END_OF_INPUT}}. 
 # Implement {{finishTask()}} method in the {{SourceOperatorStreamTask}} to 
stop the {{SourceOperator.}}

CC [~sewen]

> FLIP-27 Source does not work with synchronous savepoint
> -------------------------------------------------------
>
>                 Key: FLINK-21133
>                 URL: https://issues.apache.org/jira/browse/FLINK-21133
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core, API / DataStream, Runtime / Checkpointing
>    Affects Versions: 1.11.3, 1.12.1
>            Reporter: Kezhu Wang
>            Priority: Critical
>             Fix For: 1.11.4, 1.12.2, 1.13.0
>
>
> I have pushed branch 
> [synchronous-savepoint-conflict-with-bounded-end-input-case|https://github.com/kezhuw/flink/commits/synchronous-savepoint-conflict-with-bounded-end-input-case]
>  in my repository. {{SavepointITCase.testStopSavepointWithFlip27Source}} 
> failed due to timeout.
> See also FLINK-21132 and 
> [apache/iceberg#2033|https://github.com/apache/iceberg/issues/2033]..



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to