[
https://issues.apache.org/jira/browse/FLINK-21133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17285209#comment-17285209
]
Piotr Nowojski edited comment on FLINK-21133 at 2/16/21, 2:04 PM:
------------------------------------------------------------------
As I have mentioned in the FLINK-21132, in principle I also think replacing
{{isStoppingBySyncSavepoint}} with an event is a cleaner solution on the
conceptual level.
One extra complexity is a [closing
handshake|https://github.com/apache/flink/pull/14831#issuecomment-774642652]
that we are probably going to introduce as a part of FLINK-21086. Also it will
mean we will need to support unaligned checkpoint overtaking current
{{EndOfPartitionEvent}}. Overtaking means also persisting as part of the
checkpoint and later potentially restoring.
>From this perspective I would lean more towards adding a boolean flag to the
>{{EndOfPartitionEvent}}, as opposed to adding new {{EndOfData}} event.
However the more I'm thinking about, the less convinced I'm if the added
complexity of the {{EndOfData}} (or enriching {{EndOfPartitionEvent}} with a
flag) is worth it. It looks to me like we would need to complicate quite a lot
of the code in the network stack, to pass this {{EndOfData}} to the
{{StreamTask}}.
# As [~sewen] you mentioned, we would need to track how many inputs received
{{EndOfPartitionEvent}} and/or {{EndOfData}}.
# {{InputGate#isFinished}} would have to be replaced with something more
sophisticated (it's used in quite a bit of places).
# The same applies to the return status of {{StreamTaskNetworkInput#emitNext}}
method. Currently it re-uses public facing enum {{InputStatus}}. That would
have to be changed, enriched as well and would probably affect other places as
well ({{SortingDataInput}}? {{MultiInputSortingDataInput}}?).
# Finally we would reach {{StreamOneInputProcessor#processInput}} where we
could ignore this end of input...
# ... but that would still not be enough. We would probably still need to set a
flag in the {{OperatorChain}} (basically a copy of the
{{isStoppingBySyncSavepoint}}), to pass the distinction between end of input
and end of partition to the {{OperatorChain}}... This is because end of the
{{OperatorChain}}/{{StreamTask}} inputs and the defacto {{endOfInput}} calls on
the operators are quite far apart.
# ... {{OperatorChain}} would need to handle the case, if one of its input has
received {{EndOfPartition}} and the other {{EndOfData}}, so we would again need
to duplicate the logic from 0.
So we would add a lot of code (0., 1., 2., 3., 4., 6.) and we would end up with
the same basic solution (5.)?
Or am I missing something?
CC [~roman_khachatryan]
was (Author: pnowojski):
As I have mentioned in the FLINK-21132, in principle I also think replacing
{{isStoppingBySyncSavepoint}} with an event is a cleaner solution on the
conceptual level.
One extra complexity is a [closing
handshake|https://github.com/apache/flink/pull/14831#issuecomment-774642652]
that we are probably going to introduce as a part of FLINK-21086. Also it will
mean we will need to support unaligned checkpoint overtaking current
{{EndOfPartitionEvent}}. Overtaking means also persisting as part of the
checkpoint and later potentially restoring.
>From this perspective I would lean more towards adding a boolean flag to the
>{{EndOfPartitionEvent}}, as opposed to adding new {{EndOfData}} event.
However the more I'm thinking about, the less convinced I'm if the added
complexity of the {{EndOfData}} (or enriching {{EndOfPartitionEvent}} with a
flag) is worth it. It looks to me like we would need to complicate quite a lot
of the code in the network stack, to pass this {{EndOfData}} to the
{{StreamTask}}.
1. As [~sewen] you mentioned, we would need to track how many inputs received
{{EndOfPartitionEvent}} and/or {{EndOfData}}.
2. {{InputGate#isFinished}} would have to be replaced with something more
sophisticated (it's used in quite a bit of places).
3. The same applies to the return status of {{StreamTaskNetworkInput#emitNext}}
method. Currently it re-uses public facing enum {{InputStatus}}. That would
have to be changed, enriched as well and would probably affect other places as
well ({{SortingDataInput}}? {{MultiInputSortingDataInput}}?).
4. Finally we would reach {{StreamOneInputProcessor#processInput}} where we
could ignore this end of input...
5. ... but that would still not be enough. We would probably still need to set
a flag in the {{OperatorChain}} (basically a copy of the
{{isStoppingBySyncSavepoint}}), to pass the distinction between end of input
and end of partition to the {{OperatorChain}}... This is because end of the
{{OperatorChain}}/{{StreamTask}} inputs and the defacto {{endOfInput}} calls on
the operators are quite far apart.
6. ... {{OperatorChain}} would need to handle the case, if one of its input has
received {{EndOfPartition}} and the other {{EndOfData}}, so we would again need
to duplicate the logic from 0.
So we would add a lot of code (0., 1., 2., 3., 4., 6.) and we would end up with
the same basic solution (5.)?
Or am I missing something?
CC [~roman_khachatryan]
> FLIP-27 Source does not work with synchronous savepoint
> -------------------------------------------------------
>
> Key: FLINK-21133
> URL: https://issues.apache.org/jira/browse/FLINK-21133
> Project: Flink
> Issue Type: Bug
> Components: API / Core, API / DataStream, Runtime / Checkpointing
> Affects Versions: 1.11.3, 1.12.1
> Reporter: Kezhu Wang
> Priority: Critical
> Fix For: 1.11.4, 1.13.0, 1.12.3
>
>
> I have pushed branch
> [synchronous-savepoint-conflict-with-bounded-end-input-case|https://github.com/kezhuw/flink/commits/synchronous-savepoint-conflict-with-bounded-end-input-case]
> in my repository. {{SavepointITCase.testStopSavepointWithFlip27Source}}
> failed due to timeout.
> See also FLINK-21132 and
> [apache/iceberg#2033|https://github.com/apache/iceberg/issues/2033]..
--
This message was sent by Atlassian Jira
(v8.3.4#803005)