[
https://issues.apache.org/jira/browse/FLINK-21133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17286380#comment-17286380
]
Jiangjie Qin edited comment on FLINK-21133 at 2/19/21, 12:41 AM:
-----------------------------------------------------------------
It looks that we are aiming to get a long term solution here. My two cents:
At a high level, in Flink any control logic initiated from JM eventually goes
to one of the following two types / mechanisms.
* independent point-to-point control (e.g. all JM to TM RPC)
** This is an "out-of-band" control flow, pretty much requests / responses
between JM and TMs.
** In order to make sure the tasks will respond to the control command, the
mailbox thread may have to be interrupted from a blocking call.
* job-graph wide coordinated / orderly control (e.g. checkpoint,
EndOfPartition)
** This is a control flow combining "out-of-band" and "in-band" mechanism
** JM sends the command to Sources via RPC (i.e. out-of-band)
** Sources execute the command, then propagate the command to downstream
operators in data stream (in-band)
** The downstream operators receives the command from its input data stream
(in-band)
** JM receives the ack from all the operators and complete the control logic.
(out-of-band)
** [Optional] JM notifies all the TMs about the execution result (out-of-band)
When it comes to {{StopWithSavePoint}}, to me the most intuitive semantic is
that the tasks naturally stop right after taking the savepoint without any side
effects. That means:
# The operators / tasks stop processing records right after the savepoint is
taken.
# The operators / tasks do not receive {{EndOfPartition}} / {{EndOfInput}}
because these events have already been assigned other meanings.
# {{StreamOperator.close()}} should not be invoked to confuse the operators.
Instead, {{Stream}}{{Operator.dispose()}} should be invoked directly.
At this point, I think the second control flow would just work. As long as we
adjust the behavior of {{StreamTask}} a little bit.
Implementation wise, would the following work?
# The JM sends to Sources a "StopWithSavepoint" RPC
# The Sources take a snapshot, sends a checkpoint barrier with a flag
indicating stop after checkpointing, then it stops processing data but just
blocks on mailbox. This could be done either via a state machine for the task
or simply a flag.
# The downstream tasks align the checkpoint barrier, take their own snapshots,
send the barrier to downstream tasks, then also block on mailbox.
# After the JM finalize the checkpoint, it notifies the tasks of the
completion of the checkpoint and the tasks will then exit.
# When the tasks exit, the operators are not closed, but disposed directly.
For the legacy sources, I agree with [~kezhuw] that special treatment might be
inevitable.
was (Author: becket_qin):
It looks that we are aiming to get a long term solution here. My two cents:
At a high level, in Flink any control logic initiated from JM eventually goes
to one of the following two types / mechanisms.
* independent point-to-point control (e.g. all JM to TM RPC)
** This is an "out-of-band" control flow, pretty much requests / responses
between JM and TMs.
** In order to make sure the tasks will respond to the control command, the
mailbox thread may have to be interrupted from a blocking call.
* job-graph wide coordinated / orderly control (e.g. checkpoint,
EndOfPartition)
** This is a control flow combining "out-of-band" and "in-band" mechanism
** JM sends the command to Sources via RPC (i.e. out-of-band)
** Sources execute the command, then propagate the command to downstream
operators in data stream (in-band)
** The downstream operators receives the command from its input data stream
(in-band)
** JM receives the ack from all the operators and complete the control logic.
(out-of-band)
** [Optional] JM notifies all the TMs about the execution result (out-of-band)
When it comes to {{StopWithSavePoint}}, to me the most intuitive semantic that
the tasks are naturally stopped right after the savepoint without any side
effects. That means:
# The operators / tasks stop processing records right after the savepoint is
taken.
# The operators / tasks do not receive {{EndOfPartition}} / {{EndOfInput}}
because these events have already been assigned other meanings.
# {{StreamOperator.close()}} should not be invoked to confuse the operators.
Instead, {{Stream}}{{Operator.dispose()}} should be invoked directly.
At this point, I think the second control flow would just work. As long as we
adjust the behavior of {{StreamTask}} a little bit.
Implementation wise, would the following work?
# The JM sends to Sources a "StopWithSavepoint" RPC
# The Sources take a snapshot, sends a checkpoint barrier with a flag
indicating stop after checkpointing, then it stops processing data but just
blocks on mailbox. This could be done either via a state machine for the task
or simply a flag.
# The downstream tasks align the checkpoint barrier, take their own snapshots,
send the barrier to downstream tasks, then also block on mailbox.
# After the JM finalize the checkpoint, it notifies the tasks of the
completion of the checkpoint and the tasks will then exit.
# When the tasks exit, the operators are not closed, but disposed directly.
For the legacy sources, I agree with [~kezhuw] that special treatment might be
inevitable.
> FLIP-27 Source does not work with synchronous savepoint
> -------------------------------------------------------
>
> Key: FLINK-21133
> URL: https://issues.apache.org/jira/browse/FLINK-21133
> Project: Flink
> Issue Type: Bug
> Components: API / Core, API / DataStream, Runtime / Checkpointing
> Affects Versions: 1.11.3, 1.12.1
> Reporter: Kezhu Wang
> Priority: Critical
> Fix For: 1.11.4, 1.13.0, 1.12.3
>
>
> I have pushed branch
> [synchronous-savepoint-conflict-with-bounded-end-input-case|https://github.com/kezhuw/flink/commits/synchronous-savepoint-conflict-with-bounded-end-input-case]
> in my repository. {{SavepointITCase.testStopSavepointWithFlip27Source}}
> failed due to timeout.
> See also FLINK-21132 and
> [apache/iceberg#2033|https://github.com/apache/iceberg/issues/2033]..
--
This message was sent by Atlassian Jira
(v8.3.4#803005)