[ 
https://issues.apache.org/jira/browse/FLINK-38327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18018854#comment-18018854
 ] 

Zakelly Lan commented on FLINK-38327:
-------------------------------------

I found one bug that could lead to this issue.

The file-merging manager’s recovery logic uses the Operator ID to parse the 
destination directory, whereas the manager initialization uses the Job Vertex 
ID. This mismatch can lead to a NPE. This may happen for the recovery of 
operators which are not at the head of operator chain. Typically this kind of 
operator has no state to recovery in SQL jobs, but it's possible for Datastream 
jobs.

The fix would be using Job Vertex ID everywhere.

> NPE during recovery from file-merged checkpoint after FO
> --------------------------------------------------------
>
>                 Key: FLINK-38327
>                 URL: https://issues.apache.org/jira/browse/FLINK-38327
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 2.0.0, 1.20.2, 2.1.0
>            Reporter: Zakelly Lan
>            Assignee: Zakelly Lan
>            Priority: Major
>
> Report: https://lists.apache.org/thread/yzcxqdfsfdykgzdfkovf65jbwy4j6g0y
> This job is running on Kubernetes using the Apache Flink Kubernetes operator. 
> This NullPointerException happened during a job restart after one of the 
> TaskManagers restarted because the underlying node running the TaskManager 
> pod was scaled down for maintenance. There was no rescaling or parallelism 
> change.
> The job is quite large due to heavy input traffic + state size:
> 2100 parallelism
> taskmanager.numberOfTaskSlots: 14 (so 150 TaskManagers total)
> RocksDBStateBackend used for state management. Checkpoints/savepoints are 
> written to S3 in AWS.
> According to the Flink Checkpoint UI, the full state size is ~600GB
> {code:java}
> execution.checkpointing.file-merging.enabled: true
> execution.checkpointing.file-merging.max-file-size: 32m
> execution.checkpointing.timeout: "10min"
> execution.checkpointing.tolerable-failed-checkpoints: 3
> execution.checkpointing.min-pause: "2min"
> execution.checkpointing.interval: "2min"
> {code}
> {code:java}
> java.lang.NullPointerException
> at 
> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.isManagedByFileMergingManager(FileMergingSnapshotManagerBase.java:927)
> at 
> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.lambda$restoreStateHandles$11(FileMergingSnapshotManagerBase.java:866)
> at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1220)
> at 
> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.lambda$restoreStateHandles$12(FileMergingSnapshotManagerBase.java:861)
> at 
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
> at 
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
> at 
> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
> at 
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
> at 
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
> at 
> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
> at 
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
> at 
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
> at 
> java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
> at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
> at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
> at 
> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> at 
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at 
> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
> at 
> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
> at 
> java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:992)
> at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
> at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
> at 
> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> at 
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at 
> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
> at 
> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
> at java.base/java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:1003)
> at 
> java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1845)
> at 
> java.base/java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:734)
> at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
> at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
> at 
> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> at 
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at 
> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
> at 
> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
> at 
> java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:992)
> at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
> at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
> at 
> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> at 
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at 
> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
> at 
> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.restoreStateHandles(FileMergingSnapshotManagerBase.java:858)
> at 
> org.apache.flink.runtime.checkpoint.filemerging.SubtaskFileMergingManagerRestoreOperation.restore(SubtaskFileMergingManagerRestoreOperation.java:102)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.registerRestoredStateToFileMergingManager(StreamTaskStateInitializerImpl.java:353)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:275)
> at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> at java.base/java.lang.Thread.run(Thread.java:840)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to