[ 
https://issues.apache.org/jira/browse/FLINK-31560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanfei Lei reassigned FLINK-31560:
----------------------------------

    Assignee: Yanfei Lei

> Savepoint failing to complete with ExternallyInducedSources
> -----------------------------------------------------------
>
>                 Key: FLINK-31560
>                 URL: https://issues.apache.org/jira/browse/FLINK-31560
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.16.0
>            Reporter: Fan Yang
>            Assignee: Yanfei Lei
>            Priority: Major
>         Attachments: image-2023-03-23-18-03-05-943.png, 
> image-2023-03-23-18-19-24-482.png, jobmanager_log.txt, 
> taskmanager_172.28.17.19_6123-f2dbff_log, 
> tmp_tm_172.28.17.19_6123-f2dbff_tmp_job_83ad4f408d0e7bf30f940ddfa5fe00e3_op_WindowOperator_137df028a798f504a6900a4081c9990c__1_1__uuid_edc681f0-3825-45ce-a123-9ff69ce6d8f1_db_LOG
>
>
> Flink version: 1.16.0
>  
> We are using Flink to run some streaming applications with Pravega as source 
> and use window and reduce transformations. We use RocksDB state backend with 
> incremental checkpointing enabled. We don't enable the latest changelog state 
> backend.
> When we try to stop the job, we encounter issues with the savepoint failing 
> to complete for the job. This happens most of the time. On rare occasions, 
> the job gets canceled suddenly with its savepoint get completed successfully.
> Savepointing shows below error:
>  
> {code:java}
> 2023-03-22 08:55:57,521 [jobmanager-io-thread-1] WARN  
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to 
> trigger or complete checkpoint 189 for job 7354442cd6f7c121249360680c04284d. 
> (0 consecutive failed attempts so 
> far)org.apache.flink.runtime.checkpoint.CheckpointException: Failure to 
> finalize checkpoint.    at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1375)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1265)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1157)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  [?:?]    at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  [?:?]    at java.lang.Thread.run(Thread.java:829) [?:?]
> Caused by: java.io.IOException: Unknown implementation of StreamStateHandle: 
> class org.apache.flink.runtime.state.PlaceholderStreamStateHandle    at 
> org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeStreamStateHandle(MetadataV2V3SerializerBase.java:699)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeStreamStateHandleMap(MetadataV2V3SerializerBase.java:813)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeKeyedStateHandle(MetadataV2V3SerializerBase.java:344)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeKeyedStateCol(MetadataV2V3SerializerBase.java:269)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeSubtaskState(MetadataV2V3SerializerBase.java:262)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer.serializeSubtaskState(MetadataV3Serializer.java:142)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer.serializeOperatorState(MetadataV3Serializer.java:122)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeMetadata(MetadataV2V3SerializerBase.java:146)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer.serialize(MetadataV3Serializer.java:83)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.metadata.MetadataV4Serializer.serialize(MetadataV4Serializer.java:56)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:100)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:87)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:82)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:333)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1361)
>  ~[flink-dist-1.16.0.jar:1.16.0]    ... 7 more {code}
>  
> Prior to Flink 1.16, we did not observe this error. Since 
> `PlaceholderStreamStateHandle` is used to indicate it's a reusable RocksDB 
> data for incremental checkpoint, we believe that the new improvements of 
> incremental checkpoint introduced in flink 1.16 release might be related to 
> this issue.
> We require assistance in investigating this issue and finding a solution.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to