[ 
https://issues.apache.org/jira/browse/FLINK-31560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17704058#comment-17704058
 ] 

Brian Zhou commented on FLINK-31560:
------------------------------------

Thanks [~Yanfei Lei]  to look into this.

I am in the same team with Fan and I can help to share more information with 
that.

 

We are running Flink in standalone mode in Kubernetes, the state backend is 
connected to NFS.

Here is the configuration for checkpointing,

 
!image-2023-03-23-18-03-05-943.png!

and here is the status for checkpoints/savepoints

!image-2023-03-23-18-19-24-482.png!

We are using the REST API  
[https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/rest_api/#jobs-jobid-savepoints]
 to trigger savepoints and it keeps retrying every 75 seconds. I want to ask if 
by default it is canonical savepoint.

Here is the JM log: [^jobmanager_log.txt]Currently there are three jobs still 
running in this situation in this cluster.

For reproduction, it is company internal code for the job, so not able to share 
directly, but it is simply reading Pravega data, perform window aggregation and 
print out, nothing else specific.

 

 

> Savepoint failing to complete with incremental RocksDB statebackend
> -------------------------------------------------------------------
>
>                 Key: FLINK-31560
>                 URL: https://issues.apache.org/jira/browse/FLINK-31560
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.16.0
>            Reporter: Fan Yang
>            Priority: Major
>         Attachments: image-2023-03-23-18-03-05-943.png, 
> image-2023-03-23-18-19-24-482.png, jobmanager_log.txt
>
>
> Flink version: 1.16.0
>  
> We are using Flink to run some streaming applications with Pravega as source 
> and use window and reduce transformations. We use RocksDB state backend with 
> incremental checkpointing enabled. We don't enable the latest changelog state 
> backend.
> When we try to stop the job, we encounter issues with the savepoint failing 
> to complete for the job. This happens most of the time. On rare occasions, 
> the job gets canceled suddenly with its savepoint get completed successfully.
> Savepointing shows below error:
>  
> {code:java}
> 2023-03-22 08:55:57,521 [jobmanager-io-thread-1] WARN  
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to 
> trigger or complete checkpoint 189 for job 7354442cd6f7c121249360680c04284d. 
> (0 consecutive failed attempts so 
> far)org.apache.flink.runtime.checkpoint.CheckpointException: Failure to 
> finalize checkpoint.    at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1375)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1265)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1157)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  [?:?]    at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  [?:?]    at java.lang.Thread.run(Thread.java:829) [?:?]
> Caused by: java.io.IOException: Unknown implementation of StreamStateHandle: 
> class org.apache.flink.runtime.state.PlaceholderStreamStateHandle    at 
> org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeStreamStateHandle(MetadataV2V3SerializerBase.java:699)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeStreamStateHandleMap(MetadataV2V3SerializerBase.java:813)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeKeyedStateHandle(MetadataV2V3SerializerBase.java:344)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeKeyedStateCol(MetadataV2V3SerializerBase.java:269)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeSubtaskState(MetadataV2V3SerializerBase.java:262)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer.serializeSubtaskState(MetadataV3Serializer.java:142)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer.serializeOperatorState(MetadataV3Serializer.java:122)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeMetadata(MetadataV2V3SerializerBase.java:146)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer.serialize(MetadataV3Serializer.java:83)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.metadata.MetadataV4Serializer.serialize(MetadataV4Serializer.java:56)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:100)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:87)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:82)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:333)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1361)
>  ~[flink-dist-1.16.0.jar:1.16.0]    ... 7 more {code}
>  
> Prior to Flink 1.16, we did not observe this error. Since 
> `PlaceholderStreamStateHandle` is used to indicate it's a reusable RocksDB 
> data for incremental checkpoint, we believe that the new improvements of 
> incremental checkpoint introduced in flink 1.16 release might be related to 
> this issue.
> We require assistance in investigating this issue and finding a solution.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to