This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new e2b829c7 [fix] Use JobSpec arguments in Standalone Application Mode e2b829c7 is described below commit e2b829c7df7501760dec8f9aa47685c680b227cf Author: Avocadomaster <8803803+avocadomas...@users.noreply.github.com> AuthorDate: Thu Sep 22 11:33:38 2022 +0200 [fix] Use JobSpec arguments in Standalone Application Mode --- .../operator/config/FlinkConfigBuilder.java | 7 +++++ .../operator/config/FlinkConfigBuilderTest.java | 11 +++++--- .../CmdStandaloneJobManagerDecorator.java | 5 ++++ .../StandaloneKubernetesJobManagerParameters.java | 8 ++++++ .../KubernetesStandaloneClusterDescriptorTest.java | 30 ++++++++++++++++++++++ 5 files changed, 58 insertions(+), 3 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java index d1423eb4..f8899803 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java @@ -57,6 +57,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.nio.file.Files; import java.time.Duration; +import java.util.Arrays; import java.util.Collections; import static org.apache.flink.configuration.DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH; @@ -274,6 +275,12 @@ public class FlinkConfigBuilder { effectiveConfig.set( ApplicationConfiguration.APPLICATION_MAIN_CLASS, jobSpec.getEntryClass()); } + + if (jobSpec.getArgs() != null) { + effectiveConfig.set( + ApplicationConfiguration.APPLICATION_ARGS, + Arrays.asList(jobSpec.getArgs())); + } } else { effectiveConfig.set( DeploymentOptions.TARGET, KubernetesDeploymentTarget.SESSION.getName()); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java index 69ade6fb..4fe27b35 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java @@ -300,9 +300,11 @@ public class FlinkConfigBuilderTest { @Test public void testApplyJobOrSessionSpec() throws Exception { - flinkDeployment.getSpec().getJob().setAllowNonRestoredState(true); + FlinkDeployment deploymentClone = ReconciliationUtils.clone(flinkDeployment); + deploymentClone.getSpec().getJob().setAllowNonRestoredState(true); + deploymentClone.getSpec().getJob().setArgs(new String[] {"--test", "123"}); var configuration = - new FlinkConfigBuilder(flinkDeployment, new Configuration()) + new FlinkConfigBuilder(deploymentClone, new Configuration()) .applyJobOrSessionSpec() .build(); Assertions.assertTrue( @@ -313,8 +315,11 @@ public class FlinkConfigBuilderTest { Assertions.assertEquals(SAMPLE_JAR, configuration.get(PipelineOptions.JARS).get(0)); Assertions.assertEquals( Integer.valueOf(2), configuration.get(CoreOptions.DEFAULT_PARALLELISM)); + Assertions.assertEquals( + List.of("--test", "123"), + configuration.get(ApplicationConfiguration.APPLICATION_ARGS)); - var dep = ReconciliationUtils.clone(flinkDeployment); + var dep = ReconciliationUtils.clone(deploymentClone); dep.getSpec().setTaskManager(new TaskManagerSpec()); dep.getSpec().getTaskManager().setReplicas(3); dep.getSpec().getFlinkConfiguration().put(TaskManagerOptions.NUM_TASK_SLOTS.key(), "4"); diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java index 9bfcc866..44ded3c2 100644 --- a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java +++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java @@ -85,6 +85,11 @@ public class CmdStandaloneJobManagerDecorator extends AbstractKubernetesStepDeco args.add(allowNonRestoredState.toString()); } + List<String> jobSpecArgs = kubernetesJobManagerParameters.getJobSpecArgs(); + if (jobSpecArgs != null) { + args.addAll(kubernetesJobManagerParameters.getJobSpecArgs()); + } + return args; } } diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java index a46172f4..b2ca29af 100644 --- a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java +++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; /** @@ -92,4 +93,11 @@ public class StandaloneKubernetesJobManagerParameters extends KubernetesJobManag public boolean isPipelineClasspathDefined() { return flinkConfig.contains(PipelineOptions.CLASSPATHS); } + + public List<String> getJobSpecArgs() { + if (flinkConfig.contains(ApplicationConfiguration.APPLICATION_ARGS)) { + return flinkConfig.get(ApplicationConfiguration.APPLICATION_ARGS); + } + return null; + } } diff --git a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java index 641ddfdc..5a0d6a0e 100644 --- a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java +++ b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java @@ -176,4 +176,34 @@ public class KubernetesStandaloneClusterDescriptorTest { Constants.REST_PORT); assertEquals(expectedWebUrl, clusterClient.getWebInterfaceURL()); } + + @Test + public void testMainContainerArgsIntegrity() throws Exception { + ClusterSpecification clusterSpecification = TestUtils.createClusterSpecification(); + + clusterDescriptor.deployApplicationCluster( + clusterSpecification, + new ApplicationConfiguration(new String[] {"--test", "123"}, "test")); + List<Deployment> deployments = + kubernetesClient + .apps() + .deployments() + .inNamespace(TestUtils.TEST_NAMESPACE) + .list() + .getItems(); + String expectedJMDeploymentName = TestUtils.CLUSTER_ID; + + Deployment jmDeployment = + deployments.stream() + .filter(d -> d.getMetadata().getName().equals(expectedJMDeploymentName)) + .findFirst() + .orElse(null); + assertTrue( + jmDeployment.getSpec().getTemplate().getSpec().getContainers().stream() + .anyMatch(c -> c.getArgs().contains("standalone-job"))); + + assertTrue( + jmDeployment.getSpec().getTemplate().getSpec().getContainers().stream() + .anyMatch(c -> c.getArgs().contains("123"))); + } }