Yun Gao created FLINK-25305:
-------------------------------
Summary: Always wait for input channel state and result partition
state get completed in AsyncRunnable
Key: FLINK-25305
URL: https://issues.apache.org/jira/browse/FLINK-25305
Project: Flink
Issue Type: Sub-task
Components: Runtime / Checkpointing
Affects Versions: 1.15.0
Reporter: Yun Gao
{code:java}
29245 [jobmanager-io-thread-12] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Decline
checkpoint 16 by task 07fea3eb73acb4898317b4aa2c9fea30 of job
da6de908107aa847cde5e9e0beb4812b at 064277c9-73dc-4bf2-8729-91ab16bbe8c6 @
localhost (dataPort=-1).org.apache.flink.util.SerializedThrowable: Asynchronous
task checkpoint failed.
at
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:321)
~[classes/:?]
at
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:158)
~[classes/:?]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_271]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_271]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_271]
Caused by: org.apache.flink.util.SerializedThrowable: Could not materialize
checkpoint 16 for operator keyed (1/5)#5.
at
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:299)
~[classes/:?]
... 4 more
Caused by: org.apache.flink.util.SerializedThrowable
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:177)
~[classes/:?]
at
org.apache.flink.util.Preconditions.checkCompletedNormally(Preconditions.java:261)
~[classes/:?]
at
org.apache.flink.util.concurrent.FutureUtils.checkStateAndGet(FutureUtils.java:1193)
~[classes/:?]
at
org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder.build(CheckpointMetricsBuilder.java:133)
~[classes/:?]
at
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.reportCompletedSnapshotStates(AsyncCheckpointRunnable.java:248)
~[classes/:?]
at
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:139)
~[classes/:?]
... 3 more
{code}
When both unaligned checkpoint and final checkpoint is enabled, some
checkpoints would fail due to the above exception at the async phase,
indicating that the checkpoint metric futures are not fully fulfilled.
The exception should be caused by when the operator get closed at first run, or
a task is restored with previously fully finished, when taking checkpoint, we
would skip snapshotting the state of the operators. Specially, we would also
not includes the InputChannelStates and the ResultPartitionState attached to
the operator. Then with unaligned checkpoint, there would be the following bad
case:
1. The task received the first barrier.
2. With the process of unaligned checkpoint, the task would snapshot the state
of the operators.
3. The checkpoint would start the asynchronous part.
4. Normally in the asynchronous part, it would wait till all the state futures
get done, including the channel states and result partition states. With this
method, it ensures the asynchronous part would wait till the last barrier
arrived. But when we have closed operators and fully finished tasks, the
situation is broken.
5. Then the asynchronous part would fail since when it try to build the
CheckpointMetrics, the alignment for this checkpoint is in fact not done yet.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)