This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 6f0914bb [FLINK-29109] Set jobId when upgrade mode is stateless (Flink < 1.16) 6f0914bb is described below commit 6f0914bbd296c9daf2664afe0a77d1df4f2e157e Author: Thomas Weise <t...@apache.org> AuthorDate: Sun Sep 25 13:24:19 2022 -0400 [FLINK-29109] Set jobId when upgrade mode is stateless (Flink < 1.16) --- .../deployment/ApplicationReconciler.java | 33 ++++++++++++++++++++++ .../kubernetes/operator/TestingFlinkService.java | 4 +++ .../deployment/ApplicationReconcilerTest.java | 29 ++++++++++++++++--- 3 files changed, 62 insertions(+), 4 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java index a4384edd..c0f53519 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java @@ -17,13 +17,16 @@ package org.apache.flink.kubernetes.operator.reconciler.deployment; +import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.PipelineOptionsInternal; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec; +import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion; import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus; @@ -172,6 +175,9 @@ public class ApplicationReconciler flinkService.deleteClusterDeployment(relatedResource.getMetadata(), status, true); flinkService.waitForClusterShutdown(deployConfig); } + + setJobIdIfNecessary(spec, status, deployConfig); + eventRecorder.triggerEvent( relatedResource, EventRecorder.Type.Normal, @@ -186,6 +192,33 @@ public class ApplicationReconciler relatedResource.getMetadata(), spec, deployConfig, kubernetesClient); } + private void setJobIdIfNecessary( + FlinkDeploymentSpec spec, FlinkDeploymentStatus status, Configuration deployConfig) { + // https://issues.apache.org/jira/browse/FLINK-19358 + // https://issues.apache.org/jira/browse/FLINK-29109 + if (spec.getFlinkVersion().isNewerVersionThan(FlinkVersion.v1_15)) { + return; + } + + if (deployConfig.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID) != null) { + // user managed, don't touch + return; + } + + // generate jobId initially or rotate on every deployment when mode is stateless + if (status.getJobStatus().getJobId() == null + || spec.getJob().getUpgradeMode() == UpgradeMode.STATELESS) { + String jobId = JobID.generate().toHexString(); + // record before first deployment to ensure we use it on any retry + status.getJobStatus().setJobId(jobId); + LOG.info("Assigning JobId override to {}", jobId); + } + + String jobId = status.getJobStatus().getJobId(); + LOG.debug("Setting {} to {}", PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobId); + deployConfig.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobId); + } + @Override protected void cancelJob( FlinkDeployment deployment, diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java index f97e5fa7..27b453d0 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java @@ -24,6 +24,7 @@ import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.PipelineOptionsInternal; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget; import org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder; @@ -150,6 +151,9 @@ public class TestingFlinkService extends AbstractFlinkService { throw new Exception("Cannot submit 2 application clusters at the same time"); } JobID jobID = new JobID(); + if (conf.contains(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID)) { + jobID = JobID.fromHexString(conf.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID)); + } JobStatusMessage jobStatusMessage = new JobStatusMessage( jobID, diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java index fce09a14..0d4b30cc 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java @@ -17,11 +17,13 @@ package org.apache.flink.kubernetes.operator.reconciler.deployment; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.PipelineOptionsInternal; import org.apache.flink.configuration.SchedulerExecutionMode; import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.TestingFlinkService; @@ -110,6 +112,9 @@ public class ApplicationReconcilerTest { .f1 .isFirstDeployment()); + JobID jobId = runningJobs.get(0).f1.getJobId(); + verifyJobId(deployment, runningJobs.get(0).f1, runningJobs.get(0).f2, jobId); + // Test stateless upgrade FlinkDeployment statelessUpgrade = ReconciliationUtils.clone(deployment); statelessUpgrade.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS); @@ -138,10 +143,11 @@ public class ApplicationReconcilerTest { assertEquals(1, flinkService.getRunningCount()); assertNull(runningJobs.get(0).f0); - deployment - .getStatus() - .getJobStatus() - .setJobId(runningJobs.get(0).f1.getJobId().toHexString()); + assertNotEquals( + runningJobs.get(0).f2.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID), jobId); + jobId = runningJobs.get(0).f1.getJobId(); + + deployment.getStatus().getJobStatus().setJobId(jobId.toHexString()); // Test stateful upgrade FlinkDeployment statefulUpgrade = ReconciliationUtils.clone(deployment); @@ -166,6 +172,8 @@ public class ApplicationReconcilerTest { .getLastSavepoint() .getTriggerType()); + verifyJobId(deployment, runningJobs.get(0).f1, runningJobs.get(0).f2, jobId); + deployment.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE); deployment.getSpec().setRestartNonce(100L); deployment @@ -208,6 +216,19 @@ public class ApplicationReconcilerTest { assertEquals(1, flinkService.getRunningCount()); assertEquals("finished_sp", runningJobs.get(0).f0); + verifyJobId(deployment, runningJobs.get(0).f1, runningJobs.get(0).f2, jobId); + } + + private void verifyJobId( + FlinkDeployment deployment, JobStatusMessage status, Configuration conf, JobID jobId) { + if (deployment.getSpec().getFlinkVersion().isNewerVersionThan(FlinkVersion.v1_15)) { + assertNull(conf.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID)); + } else { + // jobId set by operator + assertEquals(jobId, status.getJobId()); + assertEquals( + conf.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID), jobId.toHexString()); + } } @Test