[ 
https://issues.apache.org/jira/browse/FLINK-21133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17285209#comment-17285209
 ] 

Piotr Nowojski edited comment on FLINK-21133 at 2/16/21, 2:04 PM:
------------------------------------------------------------------

As I have mentioned in the FLINK-21132, in principle I also think replacing 
{{isStoppingBySyncSavepoint}} with an event is a cleaner solution on the 
conceptual level.

One extra complexity is a [closing 
handshake|https://github.com/apache/flink/pull/14831#issuecomment-774642652] 
that we are probably going to introduce as a part of FLINK-21086. Also it will 
mean we will need to support unaligned checkpoint overtaking current 
{{EndOfPartitionEvent}}. Overtaking means also persisting as part of the 
checkpoint and later potentially restoring. 

>From this perspective I would lean more towards adding a boolean flag to the 
>{{EndOfPartitionEvent}}, as opposed to adding new {{EndOfData}} event. 

However the more I'm thinking about, the less convinced I'm if the added 
complexity of the {{EndOfData}} (or enriching {{EndOfPartitionEvent}} with a 
flag) is worth it. It looks to me like we would need to complicate quite a lot 
of the code in the network stack, to pass this {{EndOfData}} to the 
{{StreamTask}}.

# As [~sewen] you mentioned, we would need to track how many inputs received 
{{EndOfPartitionEvent}} and/or {{EndOfData}}.
# {{InputGate#isFinished}} would have to be replaced with something more 
sophisticated (it's used in quite a bit of places).
# The same applies to the return status of {{StreamTaskNetworkInput#emitNext}} 
method. Currently it re-uses public facing enum {{InputStatus}}. That would 
have to be changed, enriched as well and would probably affect other places as 
well ({{SortingDataInput}}? {{MultiInputSortingDataInput}}?).
# Finally we would reach {{StreamOneInputProcessor#processInput}} where we 
could ignore this end of input...
# ... but that would still not be enough. We would probably still need to set a 
flag in the {{OperatorChain}} (basically a copy of the 
{{isStoppingBySyncSavepoint}}), to pass the distinction between end of input 
and end of partition to the {{OperatorChain}}... This is because end of the 
{{OperatorChain}}/{{StreamTask}} inputs and the defacto {{endOfInput}} calls on 
the operators are quite far apart.
# ... {{OperatorChain}} would need to handle the case, if one of its input has 
received {{EndOfPartition}} and the other {{EndOfData}}, so we would again need 
to duplicate the logic from 0.

So we would add a lot of code (0., 1., 2., 3., 4., 6.) and we would end up with 
the same basic solution (5.)?

Or am I missing something?

CC [~roman_khachatryan]


was (Author: pnowojski):
As I have mentioned in the FLINK-21132, in principle I also think replacing 
{{isStoppingBySyncSavepoint}} with an event is a cleaner solution on the 
conceptual level.

One extra complexity is a [closing 
handshake|https://github.com/apache/flink/pull/14831#issuecomment-774642652] 
that we are probably going to introduce as a part of FLINK-21086. Also it will 
mean we will need to support unaligned checkpoint overtaking current 
{{EndOfPartitionEvent}}. Overtaking means also persisting as part of the 
checkpoint and later potentially restoring. 

>From this perspective I would lean more towards adding a boolean flag to the 
>{{EndOfPartitionEvent}}, as opposed to adding new {{EndOfData}} event. 

However the more I'm thinking about, the less convinced I'm if the added 
complexity of the {{EndOfData}} (or enriching {{EndOfPartitionEvent}} with a 
flag) is worth it. It looks to me like we would need to complicate quite a lot 
of the code in the network stack, to pass this {{EndOfData}} to the 
{{StreamTask}}.

1. As [~sewen] you mentioned, we would need to track how many inputs received 
{{EndOfPartitionEvent}} and/or {{EndOfData}}.
2. {{InputGate#isFinished}} would have to be replaced with something more 
sophisticated (it's used in quite a bit of places).
3. The same applies to the return status of {{StreamTaskNetworkInput#emitNext}} 
method. Currently it re-uses public facing enum {{InputStatus}}. That would 
have to be changed, enriched as well and would probably affect other places as 
well ({{SortingDataInput}}? {{MultiInputSortingDataInput}}?).
4. Finally we would reach {{StreamOneInputProcessor#processInput}} where we 
could ignore this end of input...
5. ... but that would still not be enough. We would probably still need to set 
a flag in the {{OperatorChain}} (basically a copy of the 
{{isStoppingBySyncSavepoint}}), to pass the distinction between end of input 
and end of partition to the {{OperatorChain}}... This is because end of the 
{{OperatorChain}}/{{StreamTask}} inputs and the defacto {{endOfInput}} calls on 
the operators are quite far apart.
6. ... {{OperatorChain}} would need to handle the case, if one of its input has 
received {{EndOfPartition}} and the other {{EndOfData}}, so we would again need 
to duplicate the logic from 0.

So we would add a lot of code (0., 1., 2., 3., 4., 6.) and we would end up with 
the same basic solution (5.)?

Or am I missing something?

CC [~roman_khachatryan]

> FLIP-27 Source does not work with synchronous savepoint
> -------------------------------------------------------
>
>                 Key: FLINK-21133
>                 URL: https://issues.apache.org/jira/browse/FLINK-21133
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core, API / DataStream, Runtime / Checkpointing
>    Affects Versions: 1.11.3, 1.12.1
>            Reporter: Kezhu Wang
>            Priority: Critical
>             Fix For: 1.11.4, 1.13.0, 1.12.3
>
>
> I have pushed branch 
> [synchronous-savepoint-conflict-with-bounded-end-input-case|https://github.com/kezhuw/flink/commits/synchronous-savepoint-conflict-with-bounded-end-input-case]
>  in my repository. {{SavepointITCase.testStopSavepointWithFlip27Source}} 
> failed due to timeout.
> See also FLINK-21132 and 
> [apache/iceberg#2033|https://github.com/apache/iceberg/issues/2033]..



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to