[
https://issues.apache.org/jira/browse/FLINK-37747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Fabian Paul updated FLINK-37747:
--------------------------------
Fix Version/s: 2.0.1
> GlobalCommitterOperator cannot commit after scaling writer/committer
> --------------------------------------------------------------------
>
> Key: FLINK-37747
> URL: https://issues.apache.org/jira/browse/FLINK-37747
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Task
> Affects Versions: 2.0.0, 1.19.2, 1.20.1, 2.1.0
> Reporter: David
> Assignee: David
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 2.1.0, 2.0.1, 1.20.3
>
>
> Hey,
> Our FLINK job stopped writing into Delta table with FLINK Delta connector
> frequently. After checking the issue, we found in GlobalCommitterOperator, in
> [commit|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperator.java#L207]
> function, it was returned directly when checking some checkpoint has
> finished or not(this
> [code|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperator.java#L211]).
> The issue was happened when:
> * auto-scaler scales up chained writer/committer(the direct upstream
> operator of GlobalCommitterOperator)
> * job ran limited TM first with lower parallelism for writer/committer, and
> then writer/committer was scaled up to higher parallelism
> After debugging with more logs, we found the cause of the issue. An example
> is:
> * for checkpoint 3, FLINK job completed successfully with 3 writer/committer
> in parallel
> ** All committable objects in writer/committer were saved into checkpoint
> state in checkpoint 3
> * writer/committer was scaled up to 5 parallel tasks
> * writer/committer restore state from checkpoint 3, they will emit
> committable objects from checkpoint 3. code is
> [here|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java#L139]
> ** latest parallelism of writer/committer is used, which is 5 in
> CommittableSummary. Code is
> [here|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java#L186]
> * GlobalCommitterOpeartor received committable summary from
> writer/committer, it knows:
> ** 5 parallel writer/committer from upstreams
> ** it will look for committable summary from 5 upstream writer/committer
> * 3 writer/committers emit CommittableSummary to global committer operator
> as only 3 restore state from checkpoint 3
> * Global committer operator stuck here forever as it looks for committable
> summary for 5 subtasks from upstream operator
> We have a quick solution for this case and raise a PR to fix this.
> We are using FLINK 1.20 but we found the issue is still existed in master
> branch.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)