Hi Guys,

This sounds severe and I'm also taking a look...

G


On Fri, Sep 5, 2025 at 4:44 AM Zakelly Lan <[email protected]> wrote:

> Hi Kevin,
>
> Thanks for the details. It's really helpful. I'm trying to reproduce this
> according to your setup. Will let you know if any updates.
>
> I create a jira issue to track this:
> https://issues.apache.org/jira/browse/FLINK-38327
>
>
> Best,
> Zakelly
>
> On Thu, Sep 4, 2025 at 7:42 PM Kevin Kim <[email protected]>
> wrote:
>
>> Hi Zakelly,
>>
>> Thanks for the reply.
>>
>> This job is running on Kubernetes using the Apache Flink Kubernetes
>> operator. This NullPointerException happened during a job restart after one
>> of the TaskManagers restarted because the underlying node running the
>> TaskManager pod was scaled down for maintenance. There was no rescaling or
>> parallelism change.
>>
>> The job is quite large due to heavy input traffic + state size:
>> 2100 parallelism
>> taskmanager.numberOfTaskSlots: 14 (so 150 TaskManagers total)
>> RocksDBStateBackend used for state management. Checkpoints/savepoints are
>> written to S3 in AWS.
>> According to the Flink Checkpoint UI, the full state size is ~600GB
>>
>> Please let me know if more details would be helpful.
>>
>> Best regards,
>> Kevin
>>
>>
>> On Thu, Sep 4, 2025 at 6:16 AM Zakelly Lan <[email protected]> wrote:
>>
>>> Hi Kevin,
>>>
>>> Would you please provide more info about the setup? Is this a failover
>>> or manual job restart with or without a change of parallelism (rescale)?
>>>
>>>
>>> Best,
>>> Zakelly
>>>
>>>
>>>
>>> On Wed, Sep 3, 2025 at 2:43 AM Kevin Kim <[email protected]> wrote:
>>>
>>>> Has anyone seen this NullPointerException after enabling checkpoint
>>>> file merging? I'm running a job with Flink 1.20.2 with these configs:
>>>>
>>>> execution.checkpointing.file-merging.enabled: true
>>>> execution.checkpointing.file-merging.max-file-size: 32m
>>>> execution.checkpointing.timeout: "10min"
>>>> execution.checkpointing.tolerable-failed-checkpoints: 3
>>>> execution.checkpointing.min-pause: "2min"
>>>> execution.checkpointing.interval: "2min"
>>>>
>>>> java.lang.NullPointerException
>>>> at
>>>> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.isManagedByFileMergingManager(FileMergingSnapshotManagerBase.java:927)
>>>> at
>>>> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.lambda$restoreStateHandles$11(FileMergingSnapshotManagerBase.java:866)
>>>> at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1220)
>>>> at
>>>> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.lambda$restoreStateHandles$12(FileMergingSnapshotManagerBase.java:861)
>>>> at
>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>>>> at
>>>> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
>>>> at
>>>> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
>>>> at
>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>>>> at
>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>>>> at
>>>> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
>>>> at
>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>>>> at
>>>> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
>>>> at
>>>> java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
>>>> at
>>>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>>>> at
>>>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>>>> at
>>>> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>>>> at
>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>>>> at
>>>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>>> at
>>>> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
>>>> at
>>>> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
>>>> at
>>>> java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:992)
>>>> at
>>>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>>>> at
>>>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>>>> at
>>>> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>>>> at
>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>>>> at
>>>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>>> at
>>>> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
>>>> at
>>>> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
>>>> at
>>>> java.base/java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:1003)
>>>> at
>>>> java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1845)
>>>> at
>>>> java.base/java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:734)
>>>> at
>>>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>>>> at
>>>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>>>> at
>>>> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>>>> at
>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>>>> at
>>>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>>> at
>>>> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
>>>> at
>>>> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
>>>> at
>>>> java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:992)
>>>> at
>>>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>>>> at
>>>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>>>> at
>>>> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>>>> at
>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>>>> at
>>>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>>> at
>>>> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
>>>> at
>>>> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.restoreStateHandles(FileMergingSnapshotManagerBase.java:858)
>>>> at
>>>> org.apache.flink.runtime.checkpoint.filemerging.SubtaskFileMergingManagerRestoreOperation.restore(SubtaskFileMergingManagerRestoreOperation.java:102)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.registerRestoredStateToFileMergingManager(StreamTaskStateInitializerImpl.java:353)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:275)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
>>>> at
>>>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
>>>> at
>>>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939)
>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>>>> at java.base/java.lang.Thread.run(Thread.java:840)
>>>>
>>>

Reply via email to