Hangxiang Yu created FLINK-37686:
------------------------------------
Summary: ForSt consecutive failed checkpoint due to check deleted
file status when enableFileDeletions
Key: FLINK-37686
URL: https://issues.apache.org/jira/browse/FLINK-37686
Project: Flink
Issue Type: Bug
Components: Runtime / State Backends
Affects Versions: 2.1.0
Reporter: Hangxiang Yu
Assignee: Hangxiang Yu
Currently, ForSt will try to reuse files when checkpoint.
When reuse, FileOwnership is NOT_OWNED
{code:java}
2025-04-16 13:03:55,952 INFO
org.apache.flink.state.forst.fs.filemapping.FileMappingManager [] - Give up
ownership for file: MappingEntry{source=HandleBackedSource{stateHandle=File
State:
hdfs://k8s-flink-test/checkpoint/39779093/shared/op_KeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__attempt_0/db/cfc1d618-6e02-42f7-8718-553f23f6e08e
[52273557 bytes]}, fileOwnership=NOT_OWNED, isDirectory= false}, the source is
now backed by: File State:
hdfs://k8s-flink-test/checkpoint/39779093/shared/op_KeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__attempt_0/db/cfc1d618-6e02-42f7-8718-553f23f6e08e
[52273557 bytes] {code}
and this file maybe deleted by JM in async checkpoint cleaner thread.
{code:java}
2025-04-16 13:07:01,920 INFO
org.apache.flink.runtime.state.SharedStateRegistryImpl [] - Scheduled
delete of state handle File State:
hdfs://k8s-flink-test/checkpoint/39779093/shared/op_KeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__attempt_0/db/cfc1d618-6e02-42f7-8718-553f23f6e08e
[52273557 bytes].{code}
But it may still be recorded in ForSt ObsoleteFiles, everytime
enableFileDeletions is called, ForSt will check whether all obsolete files are
existed, and may fails if it's deleted by JM.
{code:java}
2025-04-16 13:07:53,123 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering
checkpoint 8 (type=CheckpointType{name='Checkpoint',
sharingFilesStrategy=FORWARD_BACKWARD}) @ 1744780073120 for job
e3fca74b64c70018276be5ab859a2ce2.
2025-04-16 13:08:04,848 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Decline
checkpoint 8 by task
8bbfb2b8ab9d60131688ca9a1b38fa4c_90bea66de1c231edf33913ecd54406c1_0_0 of job
e3fca74b64c70018276be5ab859a2ce2 at
application-flink-k8s-1744779470362-5783573-taskmanager-1-1.
org.apache.flink.util.SerializedThrowable:
org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous task
checkpoint failed.
at
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:320)
~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155)
~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[?:?]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[?:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: org.apache.flink.util.SerializedThrowable: java.lang.Exception:
Could not materialize checkpoint 8 for operator GroupAggregate[4] -> Calc[5] ->
Sink: print_sink[6] (1/1)#0.
at
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:298)
~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
... 4 more
Caused by: org.apache.flink.util.SerializedThrowable:
java.util.concurrent.ExecutionException: java.io.FileNotFoundException: File
does not exist:
hdfs://k8s-flink-test/checkpoint/39779093/shared/op_KeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__attempt_0/db/cfc1d618-6e02-42f7-8718-553f23f6e08e
at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?]
at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?]
at
org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:511)
~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54)
~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191)
~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124)
~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
... 3 more
Caused by: org.apache.flink.util.SerializedThrowable:
java.io.FileNotFoundException: File does not exist:
hdfs://k8s-flink-test/checkpoint/39779093/shared/op_KeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__attempt_0/db/cfc1d618-6e02-42f7-8718-553f23f6e08e
at
org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1417)
~[flink-shaded-hadoop-2-uber-2.7.2-10.7.jar:2.7.2-10.7]
at
org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1409)
~[flink-shaded-hadoop-2-uber-2.7.2-10.7.jar:2.7.2-10.7]
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
~[flink-shaded-hadoop-2-uber-2.7.2-10.7.jar:2.7.2-10.7]
at
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1425)
~[flink-shaded-hadoop-2-uber-2.7.2-10.7.jar:2.7.2-10.7]
at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85)
~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.getFileStatus(SafetyNetWrapperFileSystem.java:65)
~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at
org.apache.flink.state.forst.fs.ForStFlinkFileSystem.getFileStatus(ForStFlinkFileSystem.java:320)
~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at
org.apache.flink.state.forst.fs.ForStFlinkFileSystem.listStatus(ForStFlinkFileSystem.java:356)
~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at
org.apache.flink.state.forst.fs.StringifiedForStFileSystem.listStatus(StringifiedForStFileSystem.java:52)
~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at org.forstdb.RocksDB.enableFileDeletions(Native Method)
~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at org.forstdb.RocksDB.enableFileDeletions(RocksDB.java:4312)
~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at
org.apache.flink.state.forst.snapshot.ForStNativeFullSnapshotStrategy.lambda$syncPrepareResources$3(ForStNativeFullSnapshotStrategy.java:188)
~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at
org.apache.flink.state.forst.snapshot.ForStSnapshotStrategyBase$ForStNativeSnapshotResources.release(ForStSnapshotStrategyBase.java:247)
~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at
org.apache.flink.state.forst.snapshot.ForStIncrementalSnapshotStrategy$ForStIncrementalSnapshotOperation.get(ForStIncrementalSnapshotStrategy.java:287)
~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at
org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91)
~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at
org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88)
~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78)
~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
at
org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:508)
~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54)
~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191)
~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124)
~[flink-dist_2.12-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
... 3 more{code}
We should do special check for NOT_OWNED file when enableFileDeletions.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)