This is an automated email from the ASF dual-hosted git repository.

morhidi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new d382c74e [FLINK-29109] Generate random jobId for stateless upgrade 
mode irrespective of Flink version
d382c74e is described below

commit d382c74ea04fbe17ab41f42559d663d55d21763a
Author: Thomas Weise <t...@apache.org>
AuthorDate: Sat Dec 3 10:49:48 2022 -0500

    [FLINK-29109] Generate random jobId for stateless upgrade mode irrespective 
of Flink version
---
 .../reconciler/deployment/ApplicationReconciler.java |  6 ++----
 .../controller/FlinkDeploymentControllerTest.java    | 20 +++++---------------
 .../deployment/ApplicationReconcilerTest.java        | 11 +++--------
 3 files changed, 10 insertions(+), 27 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 4bb82c5d..7249aab3 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.PipelineOptionsInternal;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
-import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
 import 
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
@@ -202,11 +201,10 @@ public class ApplicationReconciler
 
     private void setJobIdIfNecessary(
             FlinkDeploymentSpec spec, FlinkDeployment resource, Configuration 
deployConfig) {
+        // The jobId assigned by Flink would be constant,
+        // overwrite to avoid checkpoint path conflicts.
         // https://issues.apache.org/jira/browse/FLINK-19358
         // https://issues.apache.org/jira/browse/FLINK-29109
-        if (spec.getFlinkVersion().isNewerVersionThan(FlinkVersion.v1_15)) {
-            return;
-        }
 
         if (deployConfig.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID) != 
null) {
             // user managed, don't touch
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 9669242e..24c4cc74 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -117,9 +117,7 @@ public class FlinkDeploymentControllerTest {
         assertEquals(
                 org.apache.flink.api.common.JobStatus.RECONCILING.name(),
                 appCluster.getStatus().getJobStatus().getState());
-        assertEquals(
-                flinkVersion.isNewerVersionThan(FlinkVersion.v1_15) ? 3 : 4,
-                testController.getInternalStatusUpdateCount());
+        assertEquals(4, testController.getInternalStatusUpdateCount());
         assertFalse(updateControl.isUpdateStatus());
         assertEquals(
                 Optional.of(
@@ -142,9 +140,7 @@ public class FlinkDeploymentControllerTest {
         assertEquals(
                 org.apache.flink.api.common.JobStatus.RECONCILING.name(),
                 appCluster.getStatus().getJobStatus().getState());
-        assertEquals(
-                flinkVersion.isNewerVersionThan(FlinkVersion.v1_15) ? 4 : 5,
-                testController.getInternalStatusUpdateCount());
+        assertEquals(5, testController.getInternalStatusUpdateCount());
         assertFalse(updateControl.isUpdateStatus());
         assertEquals(
                 Optional.of(
@@ -158,9 +154,7 @@ public class FlinkDeploymentControllerTest {
         assertEquals(
                 org.apache.flink.api.common.JobStatus.RUNNING.name(),
                 appCluster.getStatus().getJobStatus().getState());
-        assertEquals(
-                flinkVersion.isNewerVersionThan(FlinkVersion.v1_15) ? 5 : 6,
-                testController.getInternalStatusUpdateCount());
+        assertEquals(6, testController.getInternalStatusUpdateCount());
         assertFalse(updateControl.isUpdateStatus());
         assertEquals(
                 Optional.of(
@@ -175,9 +169,7 @@ public class FlinkDeploymentControllerTest {
         assertEquals(
                 org.apache.flink.api.common.JobStatus.RUNNING.name(),
                 appCluster.getStatus().getJobStatus().getState());
-        assertEquals(
-                flinkVersion.isNewerVersionThan(FlinkVersion.v1_15) ? 5 : 6,
-                testController.getInternalStatusUpdateCount());
+        assertEquals(6, testController.getInternalStatusUpdateCount());
         assertFalse(updateControl.isUpdateStatus());
         assertEquals(
                 Optional.of(
@@ -203,9 +195,7 @@ public class FlinkDeploymentControllerTest {
         assertEquals(
                 org.apache.flink.api.common.JobStatus.RUNNING.name(),
                 appCluster.getStatus().getJobStatus().getState());
-        assertEquals(
-                flinkVersion.isNewerVersionThan(FlinkVersion.v1_15) ? 6 : 7,
-                testController.getInternalStatusUpdateCount());
+        assertEquals(7, testController.getInternalStatusUpdateCount());
         assertFalse(updateControl.isUpdateStatus());
 
         reconciliationStatus = 
appCluster.getStatus().getReconciliationStatus();
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 2d8d99b0..e788f706 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -241,14 +241,9 @@ public class ApplicationReconcilerTest {
 
     private void verifyJobId(
             FlinkDeployment deployment, JobStatusMessage status, Configuration 
conf, JobID jobId) {
-        if 
(deployment.getSpec().getFlinkVersion().isNewerVersionThan(FlinkVersion.v1_15)) 
{
-            
assertNull(conf.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID));
-        } else {
-            // jobId set by operator
-            assertEquals(jobId, status.getJobId());
-            assertEquals(
-                    conf.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID), 
jobId.toHexString());
-        }
+        // jobId set by operator
+        assertEquals(jobId, status.getJobId());
+        assertEquals(conf.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID), 
jobId.toHexString());
     }
 
     @Test

Reply via email to