gyfora commented on code in PR #223:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/223#discussion_r876581661


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java:
##########
@@ -100,46 +103,49 @@ public void reconcile(FlinkDeployment flinkApp, Context 
context) throws Exceptio
             return;
         }
 
+        Configuration observeConfig = configManager.getObserveConfig(flinkApp);
         boolean specChanged = !currentDeploySpec.equals(lastReconciledSpec);
         if (specChanged) {
             if (newSpecIsAlreadyDeployed(flinkApp)) {
                 return;
             }
             LOG.debug("Detected spec change, starting upgrade process.");
-            Optional<UpgradeMode> availableUpgradeMode = 
getAvailableUpgradeMode(flinkApp);
-            if (availableUpgradeMode.isEmpty()) {
-                return;
-            }
-            UpgradeMode upgradeMode = availableUpgradeMode.get();
-
             JobState currentJobState = lastReconciledSpec.getJob().getState();
             JobState desiredJobState = desiredJobSpec.getState();
             JobState stateAfterReconcile = currentJobState;
             if (currentJobState == JobState.RUNNING) {
                 if (desiredJobState == JobState.RUNNING) {
                     LOG.info("Upgrading/Restarting running job, suspending 
first...");
                 }
-                flinkService.cancelJob(flinkApp, upgradeMode);
+                Optional<UpgradeMode> availableUpgradeMode =
+                        getAvailableUpgradeMode(flinkApp, deployConfig);
+                if (availableUpgradeMode.isEmpty()) {
+                    return;
+                }
+                // We must record the upgrade mode used to the status later
+                desiredJobSpec.setUpgradeMode(availableUpgradeMode.get());

Review Comment:
   @wangyang0918 this is the part I was referring to. It is very important to 
know what upgradeMode we used during suspend to be able to verify/not-verify HA 
metadata. 
   
   If you are switching from without HA (from savepoint/stateless) the 
available upgrade mode is savepoint. In this case we record savepoint in the 
lastReconciledSpec upgrademode field so on restore we know not to enforce HA 
metadata validation because it cannot possibly be there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to