This is an automated email from the ASF dual-hosted git repository. morhidi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new d382c74e [FLINK-29109] Generate random jobId for stateless upgrade mode irrespective of Flink version d382c74e is described below commit d382c74ea04fbe17ab41f42559d663d55d21763a Author: Thomas Weise <t...@apache.org> AuthorDate: Sat Dec 3 10:49:48 2022 -0500 [FLINK-29109] Generate random jobId for stateless upgrade mode irrespective of Flink version --- .../reconciler/deployment/ApplicationReconciler.java | 6 ++---- .../controller/FlinkDeploymentControllerTest.java | 20 +++++--------------- .../deployment/ApplicationReconcilerTest.java | 11 +++-------- 3 files changed, 10 insertions(+), 27 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java index 4bb82c5d..7249aab3 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java @@ -24,7 +24,6 @@ import org.apache.flink.configuration.PipelineOptionsInternal; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; -import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion; import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; @@ -202,11 +201,10 @@ public class ApplicationReconciler private void setJobIdIfNecessary( FlinkDeploymentSpec spec, FlinkDeployment resource, Configuration deployConfig) { + // The jobId assigned by Flink would be constant, + // overwrite to avoid checkpoint path conflicts. // https://issues.apache.org/jira/browse/FLINK-19358 // https://issues.apache.org/jira/browse/FLINK-29109 - if (spec.getFlinkVersion().isNewerVersionThan(FlinkVersion.v1_15)) { - return; - } if (deployConfig.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID) != null) { // user managed, don't touch diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java index 9669242e..24c4cc74 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java @@ -117,9 +117,7 @@ public class FlinkDeploymentControllerTest { assertEquals( org.apache.flink.api.common.JobStatus.RECONCILING.name(), appCluster.getStatus().getJobStatus().getState()); - assertEquals( - flinkVersion.isNewerVersionThan(FlinkVersion.v1_15) ? 3 : 4, - testController.getInternalStatusUpdateCount()); + assertEquals(4, testController.getInternalStatusUpdateCount()); assertFalse(updateControl.isUpdateStatus()); assertEquals( Optional.of( @@ -142,9 +140,7 @@ public class FlinkDeploymentControllerTest { assertEquals( org.apache.flink.api.common.JobStatus.RECONCILING.name(), appCluster.getStatus().getJobStatus().getState()); - assertEquals( - flinkVersion.isNewerVersionThan(FlinkVersion.v1_15) ? 4 : 5, - testController.getInternalStatusUpdateCount()); + assertEquals(5, testController.getInternalStatusUpdateCount()); assertFalse(updateControl.isUpdateStatus()); assertEquals( Optional.of( @@ -158,9 +154,7 @@ public class FlinkDeploymentControllerTest { assertEquals( org.apache.flink.api.common.JobStatus.RUNNING.name(), appCluster.getStatus().getJobStatus().getState()); - assertEquals( - flinkVersion.isNewerVersionThan(FlinkVersion.v1_15) ? 5 : 6, - testController.getInternalStatusUpdateCount()); + assertEquals(6, testController.getInternalStatusUpdateCount()); assertFalse(updateControl.isUpdateStatus()); assertEquals( Optional.of( @@ -175,9 +169,7 @@ public class FlinkDeploymentControllerTest { assertEquals( org.apache.flink.api.common.JobStatus.RUNNING.name(), appCluster.getStatus().getJobStatus().getState()); - assertEquals( - flinkVersion.isNewerVersionThan(FlinkVersion.v1_15) ? 5 : 6, - testController.getInternalStatusUpdateCount()); + assertEquals(6, testController.getInternalStatusUpdateCount()); assertFalse(updateControl.isUpdateStatus()); assertEquals( Optional.of( @@ -203,9 +195,7 @@ public class FlinkDeploymentControllerTest { assertEquals( org.apache.flink.api.common.JobStatus.RUNNING.name(), appCluster.getStatus().getJobStatus().getState()); - assertEquals( - flinkVersion.isNewerVersionThan(FlinkVersion.v1_15) ? 6 : 7, - testController.getInternalStatusUpdateCount()); + assertEquals(7, testController.getInternalStatusUpdateCount()); assertFalse(updateControl.isUpdateStatus()); reconciliationStatus = appCluster.getStatus().getReconciliationStatus(); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java index 2d8d99b0..e788f706 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java @@ -241,14 +241,9 @@ public class ApplicationReconcilerTest { private void verifyJobId( FlinkDeployment deployment, JobStatusMessage status, Configuration conf, JobID jobId) { - if (deployment.getSpec().getFlinkVersion().isNewerVersionThan(FlinkVersion.v1_15)) { - assertNull(conf.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID)); - } else { - // jobId set by operator - assertEquals(jobId, status.getJobId()); - assertEquals( - conf.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID), jobId.toHexString()); - } + // jobId set by operator + assertEquals(jobId, status.getJobId()); + assertEquals(conf.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID), jobId.toHexString()); } @Test