Thanks so much! More context: this file merging feature is really promising for our use case. Given the large state size and parallelism, we occasionally run into S3 rate limits at checkpoint/savepoint time.
The few times I tried this file merging, it helped quite a bit, but I've had to turn it off for now due to this NPE, which happens occasionally but not always On Fri, Sep 5, 2025, 1:58 AM Gabor Somogyi <[email protected]> wrote: > Hi Guys, > > This sounds severe and I'm also taking a look... > > G > > > On Fri, Sep 5, 2025 at 4:44 AM Zakelly Lan <[email protected]> wrote: > >> Hi Kevin, >> >> Thanks for the details. It's really helpful. I'm trying to reproduce this >> according to your setup. Will let you know if any updates. >> >> I create a jira issue to track this: >> https://issues.apache.org/jira/browse/FLINK-38327 >> >> >> Best, >> Zakelly >> >> On Thu, Sep 4, 2025 at 7:42 PM Kevin Kim <[email protected]> >> wrote: >> >>> Hi Zakelly, >>> >>> Thanks for the reply. >>> >>> This job is running on Kubernetes using the Apache Flink Kubernetes >>> operator. This NullPointerException happened during a job restart after one >>> of the TaskManagers restarted because the underlying node running the >>> TaskManager pod was scaled down for maintenance. There was no rescaling or >>> parallelism change. >>> >>> The job is quite large due to heavy input traffic + state size: >>> 2100 parallelism >>> taskmanager.numberOfTaskSlots: 14 (so 150 TaskManagers total) >>> RocksDBStateBackend used for state management. Checkpoints/savepoints >>> are written to S3 in AWS. >>> According to the Flink Checkpoint UI, the full state size is ~600GB >>> >>> Please let me know if more details would be helpful. >>> >>> Best regards, >>> Kevin >>> >>> >>> On Thu, Sep 4, 2025 at 6:16 AM Zakelly Lan <[email protected]> >>> wrote: >>> >>>> Hi Kevin, >>>> >>>> Would you please provide more info about the setup? Is this a failover >>>> or manual job restart with or without a change of parallelism (rescale)? >>>> >>>> >>>> Best, >>>> Zakelly >>>> >>>> >>>> >>>> On Wed, Sep 3, 2025 at 2:43 AM Kevin Kim <[email protected]> wrote: >>>> >>>>> Has anyone seen this NullPointerException after enabling checkpoint >>>>> file merging? I'm running a job with Flink 1.20.2 with these configs: >>>>> >>>>> execution.checkpointing.file-merging.enabled: true >>>>> execution.checkpointing.file-merging.max-file-size: 32m >>>>> execution.checkpointing.timeout: "10min" >>>>> execution.checkpointing.tolerable-failed-checkpoints: 3 >>>>> execution.checkpointing.min-pause: "2min" >>>>> execution.checkpointing.interval: "2min" >>>>> >>>>> java.lang.NullPointerException >>>>> at >>>>> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.isManagedByFileMergingManager(FileMergingSnapshotManagerBase.java:927) >>>>> at >>>>> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.lambda$restoreStateHandles$11(FileMergingSnapshotManagerBase.java:866) >>>>> at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1220) >>>>> at >>>>> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.lambda$restoreStateHandles$12(FileMergingSnapshotManagerBase.java:861) >>>>> at >>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) >>>>> at >>>>> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) >>>>> at >>>>> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179) >>>>> at >>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) >>>>> at >>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) >>>>> at >>>>> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179) >>>>> at >>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) >>>>> at >>>>> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) >>>>> at >>>>> java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625) >>>>> at >>>>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) >>>>> at >>>>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) >>>>> at >>>>> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) >>>>> at >>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) >>>>> at >>>>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) >>>>> at >>>>> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596) >>>>> at >>>>> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276) >>>>> at >>>>> java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:992) >>>>> at >>>>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) >>>>> at >>>>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) >>>>> at >>>>> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) >>>>> at >>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) >>>>> at >>>>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) >>>>> at >>>>> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596) >>>>> at >>>>> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276) >>>>> at >>>>> java.base/java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:1003) >>>>> at >>>>> java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1845) >>>>> at >>>>> java.base/java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:734) >>>>> at >>>>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) >>>>> at >>>>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) >>>>> at >>>>> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) >>>>> at >>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) >>>>> at >>>>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) >>>>> at >>>>> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596) >>>>> at >>>>> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276) >>>>> at >>>>> java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:992) >>>>> at >>>>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) >>>>> at >>>>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) >>>>> at >>>>> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) >>>>> at >>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) >>>>> at >>>>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) >>>>> at >>>>> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596) >>>>> at >>>>> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.restoreStateHandles(FileMergingSnapshotManagerBase.java:858) >>>>> at >>>>> org.apache.flink.runtime.checkpoint.filemerging.SubtaskFileMergingManagerRestoreOperation.restore(SubtaskFileMergingManagerRestoreOperation.java:102) >>>>> at >>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.registerRestoredStateToFileMergingManager(StreamTaskStateInitializerImpl.java:353) >>>>> at >>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) >>>>> at >>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:275) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771) >>>>> at >>>>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) >>>>> at >>>>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) >>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) >>>>> at java.base/java.lang.Thread.run(Thread.java:840) >>>>> >>>>
