[ 
https://issues.apache.org/jira/browse/FLINK-32520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruibin Xing updated FLINK-32520:
--------------------------------
    Description: 
Kubernetes Operator version: 1.5.0
 

When upgrading one of our Flink jobs, it recovered from a savepoint created by 
the previous version of the job. The timeline of the job is as follows:
 # I upgraded the job for the first time. The job created a savepoint and 
successfully restored from it.
 # The job was running fine and created several checkpoints.
 # Later, I performed the second upgrade. Soon after submission and before the 
JobManager stopped, I realized I made a mistake in the spec, so I quickly did 
the third upgrade.
 # After the job started, I found that it had recovered from the savepoint 
created during the first upgrade.

 

It appears that there was an error when submitting the third upgrade. However, 
I'm still not quite sure why this would cause Flink to use the obsolete 
savepoint after investigating the code. The related logs for the operator are 
attached below.
 

Although I haven't found the root cause, I came up with some possible fixes:
 # Remove the {{lastSavepoint}} after a job has successfully restored from it.
 # Add options for savepoint, similar to: 
{{kubernetes.operator.job.upgrade.last-state.max.allowed.checkpoint.age}} The 
operator should refuse to recover from the savepoint if the max age is exceeded.
 # Create a flag in the status that records savepoint states. Set the flag to 
false when the savepoint starts and mark it as true when it successfully ends. 
The job should report an error if the flag for the last savepoint is false.

  was:
Kubernetes Operator version: 1.5.0
 

When upgrading one of our Flink jobs, it recovered from a savepoint created by 
the previous version of the job. The timeline of the job is as follows:
 # I upgraded the job for the first time. The job created a savepoint and 
successfully restored from it.
 # The job was running fine and created several checkpoints.
 # Later, I performed the second upgrade. Soon after submission and before the 
JobManager stopped, I realized I made a mistake in the spec, so I quickly did 
the third upgrade.
 # After the job started, I found that it had recovered from the savepoint 
created during the first upgrade.

 

It appears that there was an error when submitting the third upgrade. However, 
I'm still not quite sure why this would cause Flink to use the obsolete 
savepoint after investigating the code. The related logs for the operator are 
attached below.
 

Although I haven't found the root cause, I came up with some possible fixes:
 # Remove the {{lastSavepoint}} after a job has successfully restored from it.

 # Add options for savepoint, similar to: 
{{kubernetes.operator.job.upgrade.last-state.max.allowed.checkpoint.age}} The 
operator should refuse to recover from the savepoint if the max age is exceeded.

 # Create a flag in the status that records savepoint states. Set the flag to 
false when the savepoint starts and mark it as true when it successfully ends. 
The job should report an error if the flag for the last savepoint is false.


> FlinkDeployment recovered states from an obsolete savepoint
> -----------------------------------------------------------
>
>                 Key: FLINK-32520
>                 URL: https://issues.apache.org/jira/browse/FLINK-32520
>             Project: Flink
>          Issue Type: New Feature
>          Components: Kubernetes Operator
>    Affects Versions: 1.13.1
>            Reporter: Ruibin Xing
>            Priority: Major
>         Attachments: flink_kubernetes_operator_0615.csv
>
>
> Kubernetes Operator version: 1.5.0
>  
> When upgrading one of our Flink jobs, it recovered from a savepoint created 
> by the previous version of the job. The timeline of the job is as follows:
>  # I upgraded the job for the first time. The job created a savepoint and 
> successfully restored from it.
>  # The job was running fine and created several checkpoints.
>  # Later, I performed the second upgrade. Soon after submission and before 
> the JobManager stopped, I realized I made a mistake in the spec, so I quickly 
> did the third upgrade.
>  # After the job started, I found that it had recovered from the savepoint 
> created during the first upgrade.
>  
> It appears that there was an error when submitting the third upgrade. 
> However, I'm still not quite sure why this would cause Flink to use the 
> obsolete savepoint after investigating the code. The related logs for the 
> operator are attached below.
>  
> Although I haven't found the root cause, I came up with some possible fixes:
>  # Remove the {{lastSavepoint}} after a job has successfully restored from it.
>  # Add options for savepoint, similar to: 
> {{kubernetes.operator.job.upgrade.last-state.max.allowed.checkpoint.age}} The 
> operator should refuse to recover from the savepoint if the max age is 
> exceeded.
>  # Create a flag in the status that records savepoint states. Set the flag to 
> false when the savepoint starts and mark it as true when it successfully 
> ends. The job should report an error if the flag for the last savepoint is 
> false.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to