[
https://issues.apache.org/jira/browse/FLINK-38327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18019033#comment-18019033
]
Zakelly Lan edited comment on FLINK-38327 at 9/10/25 6:09 AM:
--------------------------------------------------------------
Merge into:
* master: ee73d0f79841413f3a553de87e6ea23eef0e51ab
* 2.1: e107aaddfae4beef386fd34e012ffed8cfea46da
* 2.0: 6a5a1b3662e9ed34b3bbc92046fce07b5bd577b8
* 1.20: fea8f96aa764ca2924e8490c3e1e440d1962d0b4
was (Author: zakelly):
Merge into:
* master: ee73d0f79841413f3a553de87e6ea23eef0e51ab
* 2.1: TBD
* 2.0: TBD
* 1.20: TBD
> NPE during recovery from file-merged checkpoint after FO
> --------------------------------------------------------
>
> Key: FLINK-38327
> URL: https://issues.apache.org/jira/browse/FLINK-38327
> Project: Flink
> Issue Type: Bug
> Affects Versions: 2.0.0, 1.20.2, 2.1.0
> Reporter: Zakelly Lan
> Assignee: Zakelly Lan
> Priority: Major
> Labels: pull-request-available
>
> Report: https://lists.apache.org/thread/yzcxqdfsfdykgzdfkovf65jbwy4j6g0y
> This job is running on Kubernetes using the Apache Flink Kubernetes operator.
> This NullPointerException happened during a job restart after one of the
> TaskManagers restarted because the underlying node running the TaskManager
> pod was scaled down for maintenance. There was no rescaling or parallelism
> change.
> The job is quite large due to heavy input traffic + state size:
> 2100 parallelism
> taskmanager.numberOfTaskSlots: 14 (so 150 TaskManagers total)
> RocksDBStateBackend used for state management. Checkpoints/savepoints are
> written to S3 in AWS.
> According to the Flink Checkpoint UI, the full state size is ~600GB
> {code:java}
> execution.checkpointing.file-merging.enabled: true
> execution.checkpointing.file-merging.max-file-size: 32m
> execution.checkpointing.timeout: "10min"
> execution.checkpointing.tolerable-failed-checkpoints: 3
> execution.checkpointing.min-pause: "2min"
> execution.checkpointing.interval: "2min"
> {code}
> {code:java}
> java.lang.NullPointerException
> at
> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.isManagedByFileMergingManager(FileMergingSnapshotManagerBase.java:927)
> at
> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.lambda$restoreStateHandles$11(FileMergingSnapshotManagerBase.java:866)
> at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1220)
> at
> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.lambda$restoreStateHandles$12(FileMergingSnapshotManagerBase.java:861)
> at
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
> at
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
> at
> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
> at
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
> at
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
> at
> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
> at
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
> at
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
> at
> java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
> at
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
> at
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
> at
> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> at
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> at
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at
> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
> at
> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
> at
> java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:992)
> at
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
> at
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
> at
> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> at
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> at
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at
> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
> at
> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
> at java.base/java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:1003)
> at
> java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1845)
> at
> java.base/java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:734)
> at
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
> at
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
> at
> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> at
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> at
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at
> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
> at
> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
> at
> java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:992)
> at
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
> at
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
> at
> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> at
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> at
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at
> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
> at
> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.restoreStateHandles(FileMergingSnapshotManagerBase.java:858)
> at
> org.apache.flink.runtime.checkpoint.filemerging.SubtaskFileMergingManagerRestoreOperation.restore(SubtaskFileMergingManagerRestoreOperation.java:102)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.registerRestoredStateToFileMergingManager(StreamTaskStateInitializerImpl.java:353)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:275)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> at java.base/java.lang.Thread.run(Thread.java:840)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)