[ https://issues.apache.org/jira/browse/FLINK-21133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17282226#comment-17282226 ]
Jiangjie Qin commented on FLINK-21133: -------------------------------------- [~mapohl] [~kezhuw], thanks for digging into the issue. The root cause you found sounds correct to me. A quick fix would be implementing {{finishTask()}} in the {{SourceOperatorStreamTask}}. That said, the semantic of {{finishTask()}} seems not well defined. It would be good if we can first agree on the behavior before we fix the issue. The current semantic of {{finishTask()}} is following. * For non-source tasks, this method is essentially optional and can be regarded as a "hook". And in fact it is even not guaranteed to be invoked if I understand correctly - because the checkpoint callback may come after the task has received the {{EndOfInput}} marker from upstream tasks. At that point, the task might have exited. * For source tasks, this method must be implemented in order to let the task stop when synchronous savepoint is taken. Supposedly a source task should return an {{EndOfInput}} marker from the source to the downstream so all the tasks will exit. I am not sure if this {{EndOfInput}} marker should be different from an actual {{EndOfInput}} marker as an operator may want to behave differently if the stream has not actually ended. But this is a separate issue. Assuming we keep the above semantic unchanged for now. For the quick fix, we may need to do the following: # Add a {{stop()}} method to the SourceOperator. Once it is called, the SourceOperator#emitNext() method will just return {{END_OF_INPUT}}. # Implement {{finishTask()}} method in the {{SourceOperatorStreamTask}} to stop the {{SourceOperator.}} CC [~sewen] > FLIP-27 Source does not work with synchronous savepoint > ------------------------------------------------------- > > Key: FLINK-21133 > URL: https://issues.apache.org/jira/browse/FLINK-21133 > Project: Flink > Issue Type: Bug > Components: API / Core, API / DataStream, Runtime / Checkpointing > Affects Versions: 1.11.3, 1.12.1 > Reporter: Kezhu Wang > Priority: Critical > Fix For: 1.11.4, 1.12.2, 1.13.0 > > > I have pushed branch > [synchronous-savepoint-conflict-with-bounded-end-input-case|https://github.com/kezhuw/flink/commits/synchronous-savepoint-conflict-with-bounded-end-input-case] > in my repository. {{SavepointITCase.testStopSavepointWithFlip27Source}} > failed due to timeout. > See also FLINK-21132 and > [apache/iceberg#2033|https://github.com/apache/iceberg/issues/2033].. -- This message was sent by Atlassian Jira (v8.3.4#803005)