[ 
https://issues.apache.org/jira/browse/FLINK-21133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17286380#comment-17286380
 ] 

Jiangjie Qin commented on FLINK-21133:
--------------------------------------

It looks that we are aiming to get a long term solution here. My two cents:

At a high level, in Flink any control logic initiated from JM eventually goes 
to one of the following two types / mechanisms.
 * independent point-to-point control (e.g. all JM to TM RPC)
 ** This is an "out-of-band" control flow, pretty much requests / responses 
between JM and TMs.
 ** In order to make sure the tasks will respond to the control command, the 
mailbox thread may have to be interrupted from a blocking call.
 * job-graph wide coordinated / orderly control (e.g. checkpoint, 
EndOfPartition)
 ** This is a control flow combining "out-of-band" and "in-band" mechanism
 ** JM sends the command to Sources via RPC (i.e. out-of-band)
 ** Sources execute the command, then propagate the command to downstream 
operators in data stream (in-band)
 ** The downstream operators receives the command from its input data stream 
(in-band)
 ** JM receives the ack from all the operators and complete the control logic. 
(out-of-band)
 ** [Optional] JM notifies all the TMs about the execution result (out-of-band) 

When it comes to {{StopWithSavePoint}}, to me the most intuitive semantic that 
the tasks are naturally stopped right after the savepoint without any side 
effects. That means:
 # The operators / tasks stop processing records right after the savepoint is 
taken.
 # The operators / tasks do not receive {{EndOfPartition}} / {{EndOfInput}} 
because these events have already been assigned other meanings.
 # {{StreamOperator.close()}} should not be invoked to confuse the operators. 
Instead, {{Stream}}{{Operator.dispose()}} should be invoked directly.

At this point, I think the second control flow would just work. As long as we 
adjust the behavior of {{StreamTask}} a little bit.

Implementation wise, would the following work?
 # The JM sends to Sources a "StopWithSavepoint" RPC
 # The Sources take a snapshot, sends a checkpoint barrier with a flag 
indicating stop after checkpointing, then it stops processing data but just 
blocks on mailbox. This could be done either via a state machine for the task 
or simply a flag.
 # The downstream tasks align the checkpoint barrier, take their own snapshots, 
send the barrier to downstream tasks, then also block on mailbox.
 # After the JM finalize the checkpoint, it notifies the tasks of the 
completion of the checkpoint and the tasks will then exit.
 # When the tasks exit, the operators are not closed, but disposed directly.

For the legacy sources, I agree with [~kezhuw] that special treatment might be 
inevitable.

> FLIP-27 Source does not work with synchronous savepoint
> -------------------------------------------------------
>
>                 Key: FLINK-21133
>                 URL: https://issues.apache.org/jira/browse/FLINK-21133
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core, API / DataStream, Runtime / Checkpointing
>    Affects Versions: 1.11.3, 1.12.1
>            Reporter: Kezhu Wang
>            Priority: Critical
>             Fix For: 1.11.4, 1.13.0, 1.12.3
>
>
> I have pushed branch 
> [synchronous-savepoint-conflict-with-bounded-end-input-case|https://github.com/kezhuw/flink/commits/synchronous-savepoint-conflict-with-bounded-end-input-case]
>  in my repository. {{SavepointITCase.testStopSavepointWithFlip27Source}} 
> failed due to timeout.
> See also FLINK-21132 and 
> [apache/iceberg#2033|https://github.com/apache/iceberg/issues/2033]..



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to