[ https://issues.apache.org/jira/browse/FLINK-21133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17286380#comment-17286380 ]
Jiangjie Qin edited comment on FLINK-21133 at 2/19/21, 12:41 AM: ----------------------------------------------------------------- It looks that we are aiming to get a long term solution here. My two cents: At a high level, in Flink any control logic initiated from JM eventually goes to one of the following two types / mechanisms. * independent point-to-point control (e.g. all JM to TM RPC) ** This is an "out-of-band" control flow, pretty much requests / responses between JM and TMs. ** In order to make sure the tasks will respond to the control command, the mailbox thread may have to be interrupted from a blocking call. * job-graph wide coordinated / orderly control (e.g. checkpoint, EndOfPartition) ** This is a control flow combining "out-of-band" and "in-band" mechanism ** JM sends the command to Sources via RPC (i.e. out-of-band) ** Sources execute the command, then propagate the command to downstream operators in data stream (in-band) ** The downstream operators receives the command from its input data stream (in-band) ** JM receives the ack from all the operators and complete the control logic. (out-of-band) ** [Optional] JM notifies all the TMs about the execution result (out-of-band) When it comes to {{StopWithSavePoint}}, to me the most intuitive semantic is that the tasks naturally stop right after taking the savepoint without any side effects. That means: # The operators / tasks stop processing records right after the savepoint is taken. # The operators / tasks do not receive {{EndOfPartition}} / {{EndOfInput}} because these events have already been assigned other meanings. # {{StreamOperator.close()}} should not be invoked to confuse the operators. Instead, {{Stream}}{{Operator.dispose()}} should be invoked directly. At this point, I think the second control flow would just work. As long as we adjust the behavior of {{StreamTask}} a little bit. Implementation wise, would the following work? # The JM sends to Sources a "StopWithSavepoint" RPC # The Sources take a snapshot, sends a checkpoint barrier with a flag indicating stop after checkpointing, then it stops processing data but just blocks on mailbox. This could be done either via a state machine for the task or simply a flag. # The downstream tasks align the checkpoint barrier, take their own snapshots, send the barrier to downstream tasks, then also block on mailbox. # After the JM finalize the checkpoint, it notifies the tasks of the completion of the checkpoint and the tasks will then exit. # When the tasks exit, the operators are not closed, but disposed directly. For the legacy sources, I agree with [~kezhuw] that special treatment might be inevitable. was (Author: becket_qin): It looks that we are aiming to get a long term solution here. My two cents: At a high level, in Flink any control logic initiated from JM eventually goes to one of the following two types / mechanisms. * independent point-to-point control (e.g. all JM to TM RPC) ** This is an "out-of-band" control flow, pretty much requests / responses between JM and TMs. ** In order to make sure the tasks will respond to the control command, the mailbox thread may have to be interrupted from a blocking call. * job-graph wide coordinated / orderly control (e.g. checkpoint, EndOfPartition) ** This is a control flow combining "out-of-band" and "in-band" mechanism ** JM sends the command to Sources via RPC (i.e. out-of-band) ** Sources execute the command, then propagate the command to downstream operators in data stream (in-band) ** The downstream operators receives the command from its input data stream (in-band) ** JM receives the ack from all the operators and complete the control logic. (out-of-band) ** [Optional] JM notifies all the TMs about the execution result (out-of-band) When it comes to {{StopWithSavePoint}}, to me the most intuitive semantic that the tasks are naturally stopped right after the savepoint without any side effects. That means: # The operators / tasks stop processing records right after the savepoint is taken. # The operators / tasks do not receive {{EndOfPartition}} / {{EndOfInput}} because these events have already been assigned other meanings. # {{StreamOperator.close()}} should not be invoked to confuse the operators. Instead, {{Stream}}{{Operator.dispose()}} should be invoked directly. At this point, I think the second control flow would just work. As long as we adjust the behavior of {{StreamTask}} a little bit. Implementation wise, would the following work? # The JM sends to Sources a "StopWithSavepoint" RPC # The Sources take a snapshot, sends a checkpoint barrier with a flag indicating stop after checkpointing, then it stops processing data but just blocks on mailbox. This could be done either via a state machine for the task or simply a flag. # The downstream tasks align the checkpoint barrier, take their own snapshots, send the barrier to downstream tasks, then also block on mailbox. # After the JM finalize the checkpoint, it notifies the tasks of the completion of the checkpoint and the tasks will then exit. # When the tasks exit, the operators are not closed, but disposed directly. For the legacy sources, I agree with [~kezhuw] that special treatment might be inevitable. > FLIP-27 Source does not work with synchronous savepoint > ------------------------------------------------------- > > Key: FLINK-21133 > URL: https://issues.apache.org/jira/browse/FLINK-21133 > Project: Flink > Issue Type: Bug > Components: API / Core, API / DataStream, Runtime / Checkpointing > Affects Versions: 1.11.3, 1.12.1 > Reporter: Kezhu Wang > Priority: Critical > Fix For: 1.11.4, 1.13.0, 1.12.3 > > > I have pushed branch > [synchronous-savepoint-conflict-with-bounded-end-input-case|https://github.com/kezhuw/flink/commits/synchronous-savepoint-conflict-with-bounded-end-input-case] > in my repository. {{SavepointITCase.testStopSavepointWithFlip27Source}} > failed due to timeout. > See also FLINK-21132 and > [apache/iceberg#2033|https://github.com/apache/iceberg/issues/2033].. -- This message was sent by Atlassian Jira (v8.3.4#803005)