This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/release-1.5 by this push:
     new f5be73b9 [FLINK-32067] Do not configure invalid null podTemplate
f5be73b9 is described below

commit f5be73b9e8861fe7134c224d68eabcd10a9ebfd3
Author: Gyula Fora <g_f...@apple.com>
AuthorDate: Fri May 12 11:38:06 2023 +0200

    [FLINK-32067] Do not configure invalid null podTemplate
---
 .../operator/config/FlinkConfigBuilder.java        | 10 +++---
 .../operator/config/FlinkConfigBuilderTest.java    | 38 ++++++++++++++++++++--
 2 files changed, 41 insertions(+), 7 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
index 63c00e29..dd2fd525 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
@@ -415,6 +415,7 @@ public class FlinkConfigBuilder {
                         ? KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE
                         : KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE;
 
+        Pod podTemplate;
         if (basicPod != null || appendPod != null) {
             Pod mergedPodTemplate =
                     mergePodTemplates(
@@ -422,11 +423,12 @@ public class FlinkConfigBuilder {
                             appendPod,
                             effectiveConfig.get(
                                     
KubernetesOperatorConfigOptions.POD_TEMPLATE_MERGE_BY_NAME));
-            Pod newPodTemplate = applyResourceToPodTemplate(mergedPodTemplate, 
resource);
-            effectiveConfig.setString(podConfigOption, 
createTempFile(newPodTemplate));
+            podTemplate = applyResourceToPodTemplate(mergedPodTemplate, 
resource);
         } else {
-            Pod newPodTemplate = applyResourceToPodTemplate(null, resource);
-            effectiveConfig.setString(podConfigOption, 
createTempFile(newPodTemplate));
+            podTemplate = applyResourceToPodTemplate(null, resource);
+        }
+        if (podTemplate != null) {
+            effectiveConfig.setString(podConfigOption, 
createTempFile(podTemplate));
         }
     }
 
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
index d789c13f..a90fe373 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.kubernetes.operator.api.CrdConstants;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.api.spec.IngressSpec;
+import org.apache.flink.kubernetes.operator.api.spec.JobManagerSpec;
 import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
 import org.apache.flink.kubernetes.operator.api.spec.Resource;
 import org.apache.flink.kubernetes.operator.api.spec.TaskManagerSpec;
@@ -53,7 +54,7 @@ import io.fabric8.kubernetes.api.model.Container;
 import io.fabric8.kubernetes.api.model.ObjectMeta;
 import io.fabric8.kubernetes.api.model.Pod;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
@@ -82,8 +83,8 @@ public class FlinkConfigBuilderTest {
     private static FlinkDeployment flinkDeployment;
     private static final String CUSTOM_LOG_CONFIG = "rootLogger.level = INFO";
 
-    @BeforeAll
-    public static void prepareFlinkDeployment() {
+    @BeforeEach
+    public void prepareFlinkDeployment() {
         flinkDeployment = TestUtils.buildApplicationCluster();
         final Container container0 = new Container();
         container0.setName("container0");
@@ -222,6 +223,37 @@ public class FlinkConfigBuilderTest {
                         Pod.class);
         Assertions.assertEquals("container0", 
jmPod.getSpec().getContainers().get(0).getName());
         Assertions.assertEquals("container0", 
tmPod.getSpec().getContainers().get(0).getName());
+
+        flinkDeployment.getSpec().setPodTemplate(null);
+        flinkDeployment.getSpec().setTaskManager(null);
+        flinkDeployment.getSpec().setJobManager(null);
+        configuration =
+                new FlinkConfigBuilder(flinkDeployment, new Configuration())
+                        .applyCommonPodTemplate()
+                        .applyTaskManagerSpec()
+                        .applyJobManagerSpec()
+                        .build();
+        Assertions.assertFalse(
+                
configuration.contains(KubernetesConfigOptions.KUBERNETES_POD_TEMPLATE));
+        Assertions.assertFalse(
+                
configuration.contains(KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE));
+        Assertions.assertFalse(
+                
configuration.contains(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE));
+
+        flinkDeployment.getSpec().setTaskManager(new TaskManagerSpec());
+        flinkDeployment.getSpec().setJobManager(new JobManagerSpec());
+        configuration =
+                new FlinkConfigBuilder(flinkDeployment, new Configuration())
+                        .applyCommonPodTemplate()
+                        .applyTaskManagerSpec()
+                        .applyJobManagerSpec()
+                        .build();
+        Assertions.assertFalse(
+                
configuration.contains(KubernetesConfigOptions.KUBERNETES_POD_TEMPLATE));
+        Assertions.assertFalse(
+                
configuration.contains(KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE));
+        Assertions.assertFalse(
+                
configuration.contains(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE));
     }
 
     @Test

Reply via email to