This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 6f0914bb [FLINK-29109] Set jobId when upgrade mode is stateless (Flink 
< 1.16)
6f0914bb is described below

commit 6f0914bbd296c9daf2664afe0a77d1df4f2e157e
Author: Thomas Weise <t...@apache.org>
AuthorDate: Sun Sep 25 13:24:19 2022 -0400

    [FLINK-29109] Set jobId when upgrade mode is stateless (Flink < 1.16)
---
 .../deployment/ApplicationReconciler.java          | 33 ++++++++++++++++++++++
 .../kubernetes/operator/TestingFlinkService.java   |  4 +++
 .../deployment/ApplicationReconcilerTest.java      | 29 ++++++++++++++++---
 3 files changed, 62 insertions(+), 4 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index a4384edd..c0f53519 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -17,13 +17,16 @@
 
 package org.apache.flink.kubernetes.operator.reconciler.deployment;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.PipelineOptionsInternal;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
@@ -172,6 +175,9 @@ public class ApplicationReconciler
             
flinkService.deleteClusterDeployment(relatedResource.getMetadata(), status, 
true);
             flinkService.waitForClusterShutdown(deployConfig);
         }
+
+        setJobIdIfNecessary(spec, status, deployConfig);
+
         eventRecorder.triggerEvent(
                 relatedResource,
                 EventRecorder.Type.Normal,
@@ -186,6 +192,33 @@ public class ApplicationReconciler
                 relatedResource.getMetadata(), spec, deployConfig, 
kubernetesClient);
     }
 
+    private void setJobIdIfNecessary(
+            FlinkDeploymentSpec spec, FlinkDeploymentStatus status, 
Configuration deployConfig) {
+        // https://issues.apache.org/jira/browse/FLINK-19358
+        // https://issues.apache.org/jira/browse/FLINK-29109
+        if (spec.getFlinkVersion().isNewerVersionThan(FlinkVersion.v1_15)) {
+            return;
+        }
+
+        if (deployConfig.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID) != 
null) {
+            // user managed, don't touch
+            return;
+        }
+
+        // generate jobId initially or rotate on every deployment when mode is 
stateless
+        if (status.getJobStatus().getJobId() == null
+                || spec.getJob().getUpgradeMode() == UpgradeMode.STATELESS) {
+            String jobId = JobID.generate().toHexString();
+            // record before first deployment to ensure we use it on any retry
+            status.getJobStatus().setJobId(jobId);
+            LOG.info("Assigning JobId override to {}", jobId);
+        }
+
+        String jobId = status.getJobStatus().getJobId();
+        LOG.debug("Setting {} to {}", 
PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobId);
+        deployConfig.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobId);
+    }
+
     @Override
     protected void cancelJob(
             FlinkDeployment deployment,
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index f97e5fa7..27b453d0 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -24,6 +24,7 @@ import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.PipelineOptionsInternal;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder;
@@ -150,6 +151,9 @@ public class TestingFlinkService extends 
AbstractFlinkService {
             throw new Exception("Cannot submit 2 application clusters at the 
same time");
         }
         JobID jobID = new JobID();
+        if (conf.contains(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID)) {
+            jobID = 
JobID.fromHexString(conf.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID));
+        }
         JobStatusMessage jobStatusMessage =
                 new JobStatusMessage(
                         jobID,
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index fce09a14..0d4b30cc 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -17,11 +17,13 @@
 
 package org.apache.flink.kubernetes.operator.reconciler.deployment;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.PipelineOptionsInternal;
 import org.apache.flink.configuration.SchedulerExecutionMode;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
@@ -110,6 +112,9 @@ public class ApplicationReconcilerTest {
                         .f1
                         .isFirstDeployment());
 
+        JobID jobId = runningJobs.get(0).f1.getJobId();
+        verifyJobId(deployment, runningJobs.get(0).f1, runningJobs.get(0).f2, 
jobId);
+
         // Test stateless upgrade
         FlinkDeployment statelessUpgrade = 
ReconciliationUtils.clone(deployment);
         
statelessUpgrade.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
@@ -138,10 +143,11 @@ public class ApplicationReconcilerTest {
         assertEquals(1, flinkService.getRunningCount());
         assertNull(runningJobs.get(0).f0);
 
-        deployment
-                .getStatus()
-                .getJobStatus()
-                .setJobId(runningJobs.get(0).f1.getJobId().toHexString());
+        assertNotEquals(
+                
runningJobs.get(0).f2.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID), 
jobId);
+        jobId = runningJobs.get(0).f1.getJobId();
+
+        deployment.getStatus().getJobStatus().setJobId(jobId.toHexString());
 
         // Test stateful upgrade
         FlinkDeployment statefulUpgrade = 
ReconciliationUtils.clone(deployment);
@@ -166,6 +172,8 @@ public class ApplicationReconcilerTest {
                         .getLastSavepoint()
                         .getTriggerType());
 
+        verifyJobId(deployment, runningJobs.get(0).f1, runningJobs.get(0).f2, 
jobId);
+
         deployment.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
         deployment.getSpec().setRestartNonce(100L);
         deployment
@@ -208,6 +216,19 @@ public class ApplicationReconcilerTest {
 
         assertEquals(1, flinkService.getRunningCount());
         assertEquals("finished_sp", runningJobs.get(0).f0);
+        verifyJobId(deployment, runningJobs.get(0).f1, runningJobs.get(0).f2, 
jobId);
+    }
+
+    private void verifyJobId(
+            FlinkDeployment deployment, JobStatusMessage status, Configuration 
conf, JobID jobId) {
+        if 
(deployment.getSpec().getFlinkVersion().isNewerVersionThan(FlinkVersion.v1_15)) 
{
+            
assertNull(conf.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID));
+        } else {
+            // jobId set by operator
+            assertEquals(jobId, status.getJobId());
+            assertEquals(
+                    conf.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID), 
jobId.toHexString());
+        }
     }
 
     @Test

Reply via email to