[ https://issues.apache.org/jira/browse/FLINK-31560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yanfei Lei reassigned FLINK-31560: ---------------------------------- Assignee: Yanfei Lei > Savepoint failing to complete with ExternallyInducedSources > ----------------------------------------------------------- > > Key: FLINK-31560 > URL: https://issues.apache.org/jira/browse/FLINK-31560 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing > Affects Versions: 1.16.0 > Reporter: Fan Yang > Assignee: Yanfei Lei > Priority: Major > Attachments: image-2023-03-23-18-03-05-943.png, > image-2023-03-23-18-19-24-482.png, jobmanager_log.txt, > taskmanager_172.28.17.19_6123-f2dbff_log, > tmp_tm_172.28.17.19_6123-f2dbff_tmp_job_83ad4f408d0e7bf30f940ddfa5fe00e3_op_WindowOperator_137df028a798f504a6900a4081c9990c__1_1__uuid_edc681f0-3825-45ce-a123-9ff69ce6d8f1_db_LOG > > > Flink version: 1.16.0 > > We are using Flink to run some streaming applications with Pravega as source > and use window and reduce transformations. We use RocksDB state backend with > incremental checkpointing enabled. We don't enable the latest changelog state > backend. > When we try to stop the job, we encounter issues with the savepoint failing > to complete for the job. This happens most of the time. On rare occasions, > the job gets canceled suddenly with its savepoint get completed successfully. > Savepointing shows below error: > > {code:java} > 2023-03-22 08:55:57,521 [jobmanager-io-thread-1] WARN > org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to > trigger or complete checkpoint 189 for job 7354442cd6f7c121249360680c04284d. > (0 consecutive failed attempts so > far)org.apache.flink.runtime.checkpoint.CheckpointException: Failure to > finalize checkpoint. at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1375) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1265) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1157) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119) > ~[flink-dist-1.16.0.jar:1.16.0] at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > [?:?] at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > [?:?] at java.lang.Thread.run(Thread.java:829) [?:?] > Caused by: java.io.IOException: Unknown implementation of StreamStateHandle: > class org.apache.flink.runtime.state.PlaceholderStreamStateHandle at > org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeStreamStateHandle(MetadataV2V3SerializerBase.java:699) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeStreamStateHandleMap(MetadataV2V3SerializerBase.java:813) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeKeyedStateHandle(MetadataV2V3SerializerBase.java:344) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeKeyedStateCol(MetadataV2V3SerializerBase.java:269) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeSubtaskState(MetadataV2V3SerializerBase.java:262) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer.serializeSubtaskState(MetadataV3Serializer.java:142) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer.serializeOperatorState(MetadataV3Serializer.java:122) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeMetadata(MetadataV2V3SerializerBase.java:146) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer.serialize(MetadataV3Serializer.java:83) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.runtime.checkpoint.metadata.MetadataV4Serializer.serialize(MetadataV4Serializer.java:56) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:100) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:87) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:82) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:333) > ~[flink-dist-1.16.0.jar:1.16.0] at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1361) > ~[flink-dist-1.16.0.jar:1.16.0] ... 7 more {code} > > Prior to Flink 1.16, we did not observe this error. Since > `PlaceholderStreamStateHandle` is used to indicate it's a reusable RocksDB > data for incremental checkpoint, we believe that the new improvements of > incremental checkpoint introduced in flink 1.16 release might be related to > this issue. > We require assistance in investigating this issue and finding a solution. -- This message was sent by Atlassian Jira (v8.20.10#820010)