Hi Zakelly,

On my side, we've pulled and deployed the fix from the release-1.20
branch[1] in our production canary deployments, and we haven't seen any
NPEs for ~3 days now.
Thanks so much for the quick fix!

Kevin

On Wed, Sep 10, 2025 at 8:24 AM Zakelly Lan <[email protected]> wrote:

> Hi Kevin,
>
> We’ve merged the fix into the 1.20 branch, and AFAIK, the community will
> kick off the next patch release (1.20.3) soon. In the meantime, if you’re
> interested, you may build from the release-1.20 branch[1] to verify the fix.
>
>
> Thank you again for reporting the issue and for your valuable feedback.
>
> [1] https://github.com/apache/flink/tree/release-1.20
>
> Best,
> Zakelly
>
> On Wed, Sep 10, 2025 at 1:33 AM Kevin Kim <[email protected]>
> wrote:
>
>> Thank you! Looking forward to testing the fix on my side.
>>
>> Kevin
>>
>> On Tue, Sep 9, 2025 at 9:15 AM Gabor Somogyi <[email protected]>
>> wrote:
>>
>>> Hi Kevin,
>>>
>>> We've already merged the fix to master, backports are on the way :)
>>>
>>> BR,
>>> G
>>>
>>>
>>> On Tue, Sep 9, 2025 at 2:26 PM Kevin Kim <[email protected]>
>>> wrote:
>>>
>>>> Hi Zakelly,
>>>>
>>>> Yes, this job contains both operator state and keyed state. Happy to
>>>> provide more details as needed.
>>>>
>>>> Kevin
>>>>
>>>> On Mon, Sep 8, 2025 at 10:48 PM Zakelly Lan <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Kevin,
>>>>>
>>>>> One more thing I want to make sure about the job. Is this a datastream
>>>>> job with operator state (not only keyed states after `keyby()`)?
>>>>>
>>>>>
>>>>> Thanks!
>>>>>
>>>>> Best,
>>>>> Zakelly
>>>>>
>>>>> On Fri, Sep 5, 2025 at 9:53 PM Kevin Kim <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Thanks so much!
>>>>>>
>>>>>> More context: this file merging feature is really promising for our
>>>>>> use case. Given the large state size and parallelism, we occasionally run
>>>>>> into S3 rate limits at checkpoint/savepoint time.
>>>>>>
>>>>>> The few times I tried this file merging, it helped quite a bit, but
>>>>>> I've had to turn it off for now due to this NPE, which happens 
>>>>>> occasionally
>>>>>> but not always
>>>>>>
>>>>>> On Fri, Sep 5, 2025, 1:58 AM Gabor Somogyi <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Guys,
>>>>>>>
>>>>>>> This sounds severe and I'm also taking a look...
>>>>>>>
>>>>>>> G
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Sep 5, 2025 at 4:44 AM Zakelly Lan <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Kevin,
>>>>>>>>
>>>>>>>> Thanks for the details. It's really helpful. I'm trying to
>>>>>>>> reproduce this according to your setup. Will let you know if any 
>>>>>>>> updates.
>>>>>>>>
>>>>>>>> I create a jira issue to track this:
>>>>>>>> https://issues.apache.org/jira/browse/FLINK-38327
>>>>>>>>
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Zakelly
>>>>>>>>
>>>>>>>> On Thu, Sep 4, 2025 at 7:42 PM Kevin Kim <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Hi Zakelly,
>>>>>>>>>
>>>>>>>>> Thanks for the reply.
>>>>>>>>>
>>>>>>>>> This job is running on Kubernetes using the Apache Flink
>>>>>>>>> Kubernetes operator. This NullPointerException happened during a job
>>>>>>>>> restart after one of the TaskManagers restarted because the 
>>>>>>>>> underlying node
>>>>>>>>> running the TaskManager pod was scaled down for maintenance. There 
>>>>>>>>> was no
>>>>>>>>> rescaling or parallelism change.
>>>>>>>>>
>>>>>>>>> The job is quite large due to heavy input traffic + state size:
>>>>>>>>> 2100 parallelism
>>>>>>>>> taskmanager.numberOfTaskSlots: 14 (so 150 TaskManagers total)
>>>>>>>>> RocksDBStateBackend used for state management.
>>>>>>>>> Checkpoints/savepoints are written to S3 in AWS.
>>>>>>>>> According to the Flink Checkpoint UI, the full state size is ~600GB
>>>>>>>>>
>>>>>>>>> Please let me know if more details would be helpful.
>>>>>>>>>
>>>>>>>>> Best regards,
>>>>>>>>> Kevin
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Sep 4, 2025 at 6:16 AM Zakelly Lan <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Kevin,
>>>>>>>>>>
>>>>>>>>>> Would you please provide more info about the setup? Is this a
>>>>>>>>>> failover or manual job restart with or without a change of 
>>>>>>>>>> parallelism
>>>>>>>>>> (rescale)?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Zakelly
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Sep 3, 2025 at 2:43 AM Kevin Kim <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Has anyone seen this NullPointerException after enabling
>>>>>>>>>>> checkpoint file merging? I'm running a job with Flink 1.20.2 with 
>>>>>>>>>>> these
>>>>>>>>>>> configs:
>>>>>>>>>>>
>>>>>>>>>>> execution.checkpointing.file-merging.enabled: true
>>>>>>>>>>> execution.checkpointing.file-merging.max-file-size: 32m
>>>>>>>>>>> execution.checkpointing.timeout: "10min"
>>>>>>>>>>> execution.checkpointing.tolerable-failed-checkpoints: 3
>>>>>>>>>>> execution.checkpointing.min-pause: "2min"
>>>>>>>>>>> execution.checkpointing.interval: "2min"
>>>>>>>>>>>
>>>>>>>>>>> java.lang.NullPointerException
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.isManagedByFileMergingManager(FileMergingSnapshotManagerBase.java:927)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.lambda$restoreStateHandles$11(FileMergingSnapshotManagerBase.java:866)
>>>>>>>>>>> at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1220)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.lambda$restoreStateHandles$12(FileMergingSnapshotManagerBase.java:861)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:992)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:1003)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1845)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:734)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:992)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>>>>>>>>>> at
>>>>>>>>>>> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.restoreStateHandles(FileMergingSnapshotManagerBase.java:858)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.runtime.checkpoint.filemerging.SubtaskFileMergingManagerRestoreOperation.restore(SubtaskFileMergingManagerRestoreOperation.java:102)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.registerRestoredStateToFileMergingManager(StreamTaskStateInitializerImpl.java:353)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:275)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939)
>>>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
>>>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>>>>>>>>>>> at java.base/java.lang.Thread.run(Thread.java:840)
>>>>>>>>>>>
>>>>>>>>>>

Reply via email to