[
https://issues.apache.org/jira/browse/FLINK-37605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17940559#comment-17940559
]
Arvid Heise commented on FLINK-37605:
-------------------------------------
On a second look, there is a deeper issue buried (that's why added the
assertion in the first place): What is the interaction between committables
generated during EOI and those that follow afterwards? Specifically how does it
interact with the checkpoint subsumption contract?
Picking up the example from above. Let's assume that subtask 4, 5 emit data
before sending EOI for some reason (unbalanced splits?). We are running exactly
once, so each checkpoint has a unique transaction per subtask, which is
forwarded to the committer.
Then on checkpoint 1, we simple have 3 transactions being forwarded to the
committer; one for each subtask. These need to be remembered by checkpoint id
so for committer subtask X, we have the committer state
committables=\{1=[transaction-X]}.
On upscaling during recovery, we are recommitting all of those so the committer
states will be {}.
Now on EOI, we are already forwarding committables of subtask 4, 5, such that
the committer state is committables=\{EOI=[transaction-X]} for subtask 4, 5.
On checkpoint 2, subtask 1,2,3 result in committables=\{2=[transaction-X]}
On downscaling during recovery, states are folded such that subtask 2 will have
committables=\{2=[transaction-3], EOI=[transaction-4]}. We are recommiting
everything resulting in committer state {}.
For checkpoint 3, subtask 2 would have a new state
committables=\{3=[transaction-2]}.
So I guess it kinda works because we are sure to recommit everything before
accepting new committables in recovery. So we will never have the situation
where a committable of checkpoint Y comes after EOI committable.
> SinkWriter may incorrectly infer end of input during rescale
> ------------------------------------------------------------
>
> Key: FLINK-37605
> URL: https://issues.apache.org/jira/browse/FLINK-37605
> Project: Flink
> Issue Type: Bug
> Components: API / Core
> Affects Versions: 2.0.0, 1.19.2, 1.20.1
> Reporter: Arvid Heise
> Assignee: Arvid Heise
> Priority: Major
>
> FLINK-25920 introduced an EOI check that uses state to ensure that no data is
> lost after fail over during final checkpoint.
> However, the check is too strict and can trigger in other occasions:
> * Consider a simple pipeline DataGeneratorSource -> Sink
> * Start run with parallelism 3, the source generates 3 splits
> * Checkpoint 1
> * Upscale to 5, the source still only has 3 splits, subtask 4, 5 finish
> * EOI arrives sink subtask 4, 5
> * Checkpoint 2 includes EOI for those subtasks
> * Downscale back to 3
> * All source subtasks have active splits
> * Sink subtasks get the following EOI states 1=[false, false], 2=[false,
> true], 3=[true]
> * So sink 3 assumes that it doesn't receive any more input and fails the
> assertion
> The assertion is not salvageable and we need to get rid of it entirely. The
> sink needs to deal with "duplicate" EOIs:
> * The writer will simply emit duplicate EOI committables/summary
> * The committer needs to merge them. It already does since FLINK-25920.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)