Zakelly Lan created FLINK-38327:
-----------------------------------
Summary: NPE during recovery from file-merged checkpoint after FO
Key: FLINK-38327
URL: https://issues.apache.org/jira/browse/FLINK-38327
Project: Flink
Issue Type: Bug
Affects Versions: 1.20.2, 2.0.0, 2.1.0
Reporter: Zakelly Lan
Assignee: Zakelly Lan
Report: https://lists.apache.org/thread/yzcxqdfsfdykgzdfkovf65jbwy4j6g0y
This job is running on Kubernetes using the Apache Flink Kubernetes operator.
This NullPointerException happened during a job restart after one of the
TaskManagers restarted because the underlying node running the TaskManager pod
was scaled down for maintenance. There was no rescaling or parallelism change.
The job is quite large due to heavy input traffic + state size:
2100 parallelism
taskmanager.numberOfTaskSlots: 14 (so 150 TaskManagers total)
RocksDBStateBackend used for state management. Checkpoints/savepoints are
written to S3 in AWS.
According to the Flink Checkpoint UI, the full state size is ~600GB
{code:java}
execution.checkpointing.file-merging.enabled: true
execution.checkpointing.file-merging.max-file-size: 32m
execution.checkpointing.timeout: "10min"
execution.checkpointing.tolerable-failed-checkpoints: 3
execution.checkpointing.min-pause: "2min"
execution.checkpointing.interval: "2min"
{code}
{code:java}
java.lang.NullPointerException
at
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.isManagedByFileMergingManager(FileMergingSnapshotManagerBase.java:927)
at
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.lambda$restoreStateHandles$11(FileMergingSnapshotManagerBase.java:866)
at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1220)
at
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.lambda$restoreStateHandles$12(FileMergingSnapshotManagerBase.java:861)
at
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at
java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
at
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at
java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
at
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at
java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
at
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
at
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
at
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at
java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
at
java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
at
java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:992)
at
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
at
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
at
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at
java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
at
java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
at java.base/java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:1003)
at
java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1845)
at
java.base/java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:734)
at
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
at
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
at
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at
java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
at
java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
at
java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:992)
at
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
at
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
at
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at
java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
at
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.restoreStateHandles(FileMergingSnapshotManagerBase.java:858)
at
org.apache.flink.runtime.checkpoint.filemerging.SubtaskFileMergingManagerRestoreOperation.restore(SubtaskFileMergingManagerRestoreOperation.java:102)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.registerRestoredStateToFileMergingManager(StreamTaskStateInitializerImpl.java:353)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:275)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.base/java.lang.Thread.run(Thread.java:840)
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)