[
https://issues.apache.org/jira/browse/FLINK-36125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18012558#comment-18012558
]
Grzegorz Liter commented on FLINK-36125:
----------------------------------------
[~zakelly] The checkpoint was completed successfully
2025-08-06 19:26:30,382 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed
checkpoint 99272 for job d6212bf4789bc8482aa3ff20d9329d5e (24665280477 bytes,
checkpointDuration=317745 ms, finalizationTime=376 ms).
but 3 seconds after it lost connection with TM
2025-08-06 19:26:33,432 WARN
org.apache.pekko.remote.ReliableDeliverySupervisor [] - Association
with remote system [pekko.tcp://[email protected]:6122] has failed, address
is now gated for [50] ms. Reason: [Disassociated]
2025-08-06 19:26:33,433 WARN
org.apache.pekko.remote.ReliableDeliverySupervisor [] - Association
with remote system [pekko.tcp://[email protected]:34173] has failed,
address is now gated for [50] ms. Reason: [Disassociated]
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
Lost connection to task manager '10.131.44.202/10.131.44.202:37715 [
taskmanager-1-1 ] '. This indicates that the remote task manager was lost.
...
Caused by:
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
recvAddress(..) failed: Connection reset by peer
After that it went to restarting entire job and be stuck in the loop due to
this broken checkpoint.
On the TM side there are similar connection issue, looks like random network
fail
```
2025-08-06 19:26:33,423 ERROR
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue [] -
Encountered error while consuming partitions (connection to
/10.131.44.202:46824)
2025-08-06 19:26:33.424
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
recvAddress(..) failed: Connection reset by peer
```
> File not found exception on restoring state handles with file merging
> ---------------------------------------------------------------------
>
> Key: FLINK-36125
> URL: https://issues.apache.org/jira/browse/FLINK-36125
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Affects Versions: 1.20.0
> Reporter: Burak Ozakinci
> Assignee: Zakelly Lan
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.20.1, 2.0-preview
>
> Attachments: app_code.txt
>
>
> 1.20 app with file merging with across checkpoints option enabled.
> {*}execution.checkpointing.file-merging.enabled{*}: true
> {*}execution.checkpointing.file-merging.across-checkpoint-boundary{*}: true
> App uses Zookeeper for HA and S3 for HDFS with RocksDB state backend.
> Summary of events:
> * App started to fail and restarted due to Zookeeper connection failure
> * It tried to use the previous directory with the log below while restoring
> *
> {code:java}
> Reusing previous directory
> s3:.../taskowned/job_6676b9fb581c449ea78d95efdd338ee1_tm_10.99.195.1-6122-34e093
> for checkpoint file-merging. {code}
>
> * FileMergingSnapshotManager could not find the checkpoint file under
> `taskowned` directory and the app started to failover.
>
>
> {code:java}
> java.io.FileNotFoundException: No such file or directory:
> s3:.../taskowned/job_6676b9fb581c449ea78d95efdd338ee1_tm_10.99.196.233-6122-acb0af/5ce4c69f-f02a-4f91-a656-9abdfa9d47fd
> at
> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.lambda$restoreStateHandles$11(FileMergingSnapshotManagerBase.java:880)
> at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1134)
> at
> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.lambda$restoreStateHandles$12(FileMergingSnapshotManagerBase.java:861)
> at
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
> at
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
> at
> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
> at
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
> at
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
> at
> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
> at
> java.base/java.util.stream.Streams$StreamBuilderImpl.forEachRemaining(Streams.java:411)
> at
> java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658)
> at
> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)
> at
> java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
> at
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
> at
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
> at
> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> at
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> at
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at
> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
> at
> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)
> at java.base/java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:1033)
> at
> java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
> at
> java.base/java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:734)
> at
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
> at
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
> at
> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> at
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> at
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at
> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
> at
> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)
> at
> java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
> at
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
> at
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
> at
> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> at
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> at
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at
> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
> at
> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.restoreStateHandles(FileMergingSnapshotManagerBase.java:858)
> at
> org.apache.flink.runtime.checkpoint.filemerging.SubtaskFileMergingManagerRestoreOperation.restore(SubtaskFileMergingManagerRestoreOperation.java:102)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.registerRestoredStateToFileMergingManager(StreamTaskStateInitializerImpl.java:353)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:275)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:972)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:941)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:765) at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:577) at
> java.base/java.lang.Thread.run(Thread.java:829)Caused by:
> java.io.FileNotFoundException: No such file or directory:
> s3://0825968657d315d337418c6a010371c3320d5792/6676b9fb581c449ea78d95efdd338ee1-021367244083-1724062514899/checkpoints/6676b9fb581c449ea78d95efdd338ee1/taskowned/job_6676b9fb581c449ea78d95efdd338ee1_tm_10.99.196.233-6122-acb0af/5ce4c69f-f02a-4f91-a656-9abdfa9d47fd
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3866)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getFileStatus$24(S3AFileSystem.java:3556)
> at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
> at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:3554)
> at
> org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85)
> at
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.getFileStatus(PluginFileSystemFactory.java:105)
> at
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.getFileStatus(SafetyNetWrapperFileSystem.java:65)
> at
> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.getFileSize(FileMergingSnapshotManagerBase.java:910)
> at
> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.lambda$restoreStateHandles$11(FileMergingSnapshotManagerBase.java:878)
> {code}
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)