[ 
https://issues.apache.org/jira/browse/FLINK-32520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17739934#comment-17739934
 ] 

Gyula Fora commented on FLINK-32520:
------------------------------------

It's a bit difficult to understand the logs with the resource names in each log 
lines. You can configure the logger to get these from the MDC like in: 
[https://github.com/apache/flink-kubernetes-operator/blob/main/helm/flink-kubernetes-operator/conf/log4j-operator.properties#L26]

Otherwise I cannot really see anything suspicious, seems like the operator is 
always using the HA metadata during these upgrades (due to the LAST_STATE 
upgradeMode) which should always contain the latest checkpoint.

> FlinkDeployment recovered states from an obsolete savepoint when performing 
> an upgrade
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-32520
>                 URL: https://issues.apache.org/jira/browse/FLINK-32520
>             Project: Flink
>          Issue Type: Bug
>          Components: Kubernetes Operator
>    Affects Versions: 1.13.1
>            Reporter: Ruibin Xing
>            Priority: Major
>         Attachments: flink_kubernetes_operator_0615.csv, 
> logs-06151328-06151332.csv
>
>
> Kubernetes Operator version: 1.5.0
>  
> When upgrading one of our Flink jobs, it recovered from a savepoint created 
> by the previous version of the job. The timeline of the job is as follows:
>  # I upgraded the job for the first time. The job created a savepoint and 
> successfully restored from it.
>  # The job was running fine and created several checkpoints.
>  # Later, I performed the second upgrade. Soon after submission and before 
> the JobManager stopped, I realized I made a mistake in the spec, so I quickly 
> did the third upgrade.
>  # After the job started, I found that it had recovered from the savepoint 
> created during the first upgrade.
>  
> It appears that there was an error when submitting the third upgrade. 
> However, I'm still not quite sure why this would cause Flink to use the 
> obsolete savepoint after investigating the code. The related logs for the 
> operator are attached below.
>  
> Although I haven't found the root cause, I came up with some possible fixes:
>  # Remove the {{lastSavepoint}} after a job has successfully restored from it.
>  # Add options for savepoint, similar to: 
> {{kubernetes.operator.job.upgrade.last-state.max.allowed.checkpoint.age}} The 
> operator should refuse to recover from the savepoint if the max age is 
> exceeded.
>  # Create a flag in the status that records savepoint states. Set the flag to 
> false when the savepoint starts and mark it as true when it successfully 
> ends. The job should report an error if the flag for the last savepoint is 
> false.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to