This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new e2b829c7 [fix] Use JobSpec arguments in Standalone Application Mode
e2b829c7 is described below

commit e2b829c7df7501760dec8f9aa47685c680b227cf
Author: Avocadomaster <8803803+avocadomas...@users.noreply.github.com>
AuthorDate: Thu Sep 22 11:33:38 2022 +0200

    [fix] Use JobSpec arguments in Standalone Application Mode
---
 .../operator/config/FlinkConfigBuilder.java        |  7 +++++
 .../operator/config/FlinkConfigBuilderTest.java    | 11 +++++---
 .../CmdStandaloneJobManagerDecorator.java          |  5 ++++
 .../StandaloneKubernetesJobManagerParameters.java  |  8 ++++++
 .../KubernetesStandaloneClusterDescriptorTest.java | 30 ++++++++++++++++++++++
 5 files changed, 58 insertions(+), 3 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
index d1423eb4..f8899803 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
@@ -57,6 +57,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.file.Files;
 import java.time.Duration;
+import java.util.Arrays;
 import java.util.Collections;
 
 import static 
org.apache.flink.configuration.DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH;
@@ -274,6 +275,12 @@ public class FlinkConfigBuilder {
                 effectiveConfig.set(
                         ApplicationConfiguration.APPLICATION_MAIN_CLASS, 
jobSpec.getEntryClass());
             }
+
+            if (jobSpec.getArgs() != null) {
+                effectiveConfig.set(
+                        ApplicationConfiguration.APPLICATION_ARGS,
+                        Arrays.asList(jobSpec.getArgs()));
+            }
         } else {
             effectiveConfig.set(
                     DeploymentOptions.TARGET, 
KubernetesDeploymentTarget.SESSION.getName());
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
index 69ade6fb..4fe27b35 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
@@ -300,9 +300,11 @@ public class FlinkConfigBuilderTest {
 
     @Test
     public void testApplyJobOrSessionSpec() throws Exception {
-        flinkDeployment.getSpec().getJob().setAllowNonRestoredState(true);
+        FlinkDeployment deploymentClone = 
ReconciliationUtils.clone(flinkDeployment);
+        deploymentClone.getSpec().getJob().setAllowNonRestoredState(true);
+        deploymentClone.getSpec().getJob().setArgs(new String[] {"--test", 
"123"});
         var configuration =
-                new FlinkConfigBuilder(flinkDeployment, new Configuration())
+                new FlinkConfigBuilder(deploymentClone, new Configuration())
                         .applyJobOrSessionSpec()
                         .build();
         Assertions.assertTrue(
@@ -313,8 +315,11 @@ public class FlinkConfigBuilderTest {
         Assertions.assertEquals(SAMPLE_JAR, 
configuration.get(PipelineOptions.JARS).get(0));
         Assertions.assertEquals(
                 Integer.valueOf(2), 
configuration.get(CoreOptions.DEFAULT_PARALLELISM));
+        Assertions.assertEquals(
+                List.of("--test", "123"),
+                configuration.get(ApplicationConfiguration.APPLICATION_ARGS));
 
-        var dep = ReconciliationUtils.clone(flinkDeployment);
+        var dep = ReconciliationUtils.clone(deploymentClone);
         dep.getSpec().setTaskManager(new TaskManagerSpec());
         dep.getSpec().getTaskManager().setReplicas(3);
         
dep.getSpec().getFlinkConfiguration().put(TaskManagerOptions.NUM_TASK_SLOTS.key(),
 "4");
diff --git 
a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java
 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java
index 9bfcc866..44ded3c2 100644
--- 
a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java
+++ 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java
@@ -85,6 +85,11 @@ public class CmdStandaloneJobManagerDecorator extends 
AbstractKubernetesStepDeco
             args.add(allowNonRestoredState.toString());
         }
 
+        List<String> jobSpecArgs = 
kubernetesJobManagerParameters.getJobSpecArgs();
+        if (jobSpecArgs != null) {
+            args.addAll(kubernetesJobManagerParameters.getJobSpecArgs());
+        }
+
         return args;
     }
 }
diff --git 
a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java
 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java
index a46172f4..b2ca29af 100644
--- 
a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java
+++ 
b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -92,4 +93,11 @@ public class StandaloneKubernetesJobManagerParameters 
extends KubernetesJobManag
     public boolean isPipelineClasspathDefined() {
         return flinkConfig.contains(PipelineOptions.CLASSPATHS);
     }
+
+    public List<String> getJobSpecArgs() {
+        if (flinkConfig.contains(ApplicationConfiguration.APPLICATION_ARGS)) {
+            return flinkConfig.get(ApplicationConfiguration.APPLICATION_ARGS);
+        }
+        return null;
+    }
 }
diff --git 
a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java
 
b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java
index 641ddfdc..5a0d6a0e 100644
--- 
a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java
+++ 
b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java
@@ -176,4 +176,34 @@ public class KubernetesStandaloneClusterDescriptorTest {
                         Constants.REST_PORT);
         assertEquals(expectedWebUrl, clusterClient.getWebInterfaceURL());
     }
+
+    @Test
+    public void testMainContainerArgsIntegrity() throws Exception {
+        ClusterSpecification clusterSpecification = 
TestUtils.createClusterSpecification();
+
+        clusterDescriptor.deployApplicationCluster(
+                clusterSpecification,
+                new ApplicationConfiguration(new String[] {"--test", "123"}, 
"test"));
+        List<Deployment> deployments =
+                kubernetesClient
+                        .apps()
+                        .deployments()
+                        .inNamespace(TestUtils.TEST_NAMESPACE)
+                        .list()
+                        .getItems();
+        String expectedJMDeploymentName = TestUtils.CLUSTER_ID;
+
+        Deployment jmDeployment =
+                deployments.stream()
+                        .filter(d -> 
d.getMetadata().getName().equals(expectedJMDeploymentName))
+                        .findFirst()
+                        .orElse(null);
+        assertTrue(
+                
jmDeployment.getSpec().getTemplate().getSpec().getContainers().stream()
+                        .anyMatch(c -> 
c.getArgs().contains("standalone-job")));
+
+        assertTrue(
+                
jmDeployment.getSpec().getTemplate().getSpec().getContainers().stream()
+                        .anyMatch(c -> c.getArgs().contains("123")));
+    }
 }

Reply via email to