This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 5bf7973aee353501fcd793e49e3e32fc989605ca
Author: Thomas Weise <thomas_we...@apple.com>
AuthorDate: Fri Feb 4 09:05:48 2022 -0800

    Support pod template merging
---
 .gitignore                                         |  2 +
 .../kubernetes/operator/utils/FlinkUtils.java      | 59 +++++++++++++++++++-
 .../kubernetes/operator/utils/FlinkUtilsTest.java  | 65 ++++++++++++++++++++++
 3 files changed, 124 insertions(+), 2 deletions(-)

diff --git a/.gitignore b/.gitignore
index ceb6113..95cd8a3 100644
--- a/.gitignore
+++ b/.gitignore
@@ -34,3 +34,5 @@ buildNumber.properties
 .mvn/timing.properties
 
 .idea
+*.iml
+
diff --git 
a/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java 
b/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index f53f284..18fd3c1 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -36,6 +36,10 @@ import 
org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 import 
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
 import org.apache.flink.util.StringUtils;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
@@ -48,12 +52,14 @@ import java.io.IOException;
 import java.net.URI;
 import java.nio.file.Files;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.concurrent.Executors;
 
 /** Flink Utility methods used by the operator. */
 public class FlinkUtils {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(FlinkUtils.class);
+    private static final ObjectMapper MAPPER = new ObjectMapper();
 
     public static Configuration getEffectiveConfig(FlinkDeployment flinkApp) {
         String namespace = flinkApp.getMetadata().getNamespace();
@@ -118,7 +124,10 @@ public class FlinkUtils {
                 if (spec.getJobManager().getPodTemplate() != null) {
                     effectiveConfig.set(
                             KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE,
-                            
createTempFile(spec.getJobManager().getPodTemplate()));
+                            createTempFile(
+                                    mergePodTemplates(
+                                            spec.getPodTemplate(),
+                                            
spec.getJobManager().getPodTemplate())));
                 }
             }
 
@@ -141,7 +150,10 @@ public class FlinkUtils {
                 if (spec.getTaskManager().getPodTemplate() != null) {
                     effectiveConfig.set(
                             KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE,
-                            
createTempFile(spec.getTaskManager().getPodTemplate()));
+                            createTempFile(
+                                    mergePodTemplates(
+                                            spec.getPodTemplate(),
+                                            
spec.getTaskManager().getPodTemplate())));
                 }
             }
 
@@ -169,6 +181,49 @@ public class FlinkUtils {
         return tmp.getAbsolutePath();
     }
 
+    public static Pod mergePodTemplates(Pod toPod, Pod fromPod) {
+        if (fromPod == null) {
+            return toPod;
+        } else if (toPod == null) {
+            return fromPod;
+        }
+        JsonNode node1 = MAPPER.valueToTree(toPod);
+        JsonNode node2 = MAPPER.valueToTree(fromPod);
+        mergeInto(node1, node2);
+        try {
+            return MAPPER.treeToValue(node1, Pod.class);
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    private static void mergeInto(JsonNode toNode, JsonNode fromNode) {
+        Iterator<String> fieldNames = fromNode.fieldNames();
+        while (fieldNames.hasNext()) {
+            String fieldName = fieldNames.next();
+            JsonNode toChildNode = toNode.get(fieldName);
+            JsonNode fromChildNode = fromNode.get(fieldName);
+
+            if (toChildNode != null && toChildNode.isArray() && 
fromChildNode.isArray()) {
+                // TODO: does merging arrays even make sense or should it just 
override?
+                for (int i = 0; i < fromChildNode.size(); i++) {
+                    JsonNode updatedChildNode = fromChildNode.get(i);
+                    if (toChildNode.size() <= i) {
+                        // append new node
+                        ((ArrayNode) toChildNode).add(updatedChildNode);
+                    }
+                    mergeInto(toChildNode.get(i), updatedChildNode);
+                }
+            } else if (toChildNode != null && toChildNode.isObject()) {
+                mergeInto(toChildNode, fromChildNode);
+            } else {
+                if (toNode instanceof ObjectNode) {
+                    ((ObjectNode) toNode).replace(fieldName, fromChildNode);
+                }
+            }
+        }
+    }
+
     public static ClusterClient<String> getRestClusterClient(Configuration 
config)
             throws Exception {
         final String clusterId = 
config.get(KubernetesConfigOptions.CLUSTER_ID);
diff --git 
a/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java 
b/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
new file mode 100644
index 0000000..cf95292
--- /dev/null
+++ 
b/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodSpec;
+import org.junit.Assert;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+/** FlinkUtilsTest. */
+public class FlinkUtilsTest {
+
+    @Test
+    public void testMergePods() throws Exception {
+
+        Container container1 = new Container();
+        container1.setName("container1");
+        Container container2 = new Container();
+        container2.setName("container2");
+
+        PodSpec podSpec1 = new PodSpec();
+        podSpec1.setHostname("pod1 hostname");
+        podSpec1.setContainers(Arrays.asList(container2));
+        Pod pod1 = new Pod();
+        pod1.setApiVersion("pod1 api version");
+        pod1.setSpec(podSpec1);
+
+        PodSpec podSpec2 = new PodSpec();
+        podSpec2.setHostname("pod2 hostname");
+        podSpec2.setContainers(Arrays.asList(container1, container2));
+        Pod pod2 = new Pod();
+        pod2.setApiVersion("pod2 api version");
+        pod2.setSpec(podSpec2);
+
+        ObjectMapper mapper = new ObjectMapper();
+        JsonNode node1 = mapper.valueToTree(pod1);
+        JsonNode node2 = mapper.valueToTree(pod2);
+
+        Pod mergedPod = FlinkUtils.mergePodTemplates(pod1, pod2);
+
+        Assert.assertEquals(pod2.getApiVersion(), mergedPod.getApiVersion());
+        Assert.assertEquals(pod2.getSpec().getContainers(), 
mergedPod.getSpec().getContainers());
+    }
+}

Reply via email to