This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 5bf7973aee353501fcd793e49e3e32fc989605ca Author: Thomas Weise <thomas_we...@apple.com> AuthorDate: Fri Feb 4 09:05:48 2022 -0800 Support pod template merging --- .gitignore | 2 + .../kubernetes/operator/utils/FlinkUtils.java | 59 +++++++++++++++++++- .../kubernetes/operator/utils/FlinkUtilsTest.java | 65 ++++++++++++++++++++++ 3 files changed, 124 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index ceb6113..95cd8a3 100644 --- a/.gitignore +++ b/.gitignore @@ -34,3 +34,5 @@ buildNumber.properties .mvn/timing.properties .idea +*.iml + diff --git a/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java index f53f284..18fd3c1 100644 --- a/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java +++ b/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java @@ -36,6 +36,10 @@ import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec; import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices; import org.apache.flink.util.StringUtils; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.NamespacedKubernetesClient; @@ -48,12 +52,14 @@ import java.io.IOException; import java.net.URI; import java.nio.file.Files; import java.util.Collections; +import java.util.Iterator; import java.util.concurrent.Executors; /** Flink Utility methods used by the operator. */ public class FlinkUtils { private static final Logger LOG = LoggerFactory.getLogger(FlinkUtils.class); + private static final ObjectMapper MAPPER = new ObjectMapper(); public static Configuration getEffectiveConfig(FlinkDeployment flinkApp) { String namespace = flinkApp.getMetadata().getNamespace(); @@ -118,7 +124,10 @@ public class FlinkUtils { if (spec.getJobManager().getPodTemplate() != null) { effectiveConfig.set( KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE, - createTempFile(spec.getJobManager().getPodTemplate())); + createTempFile( + mergePodTemplates( + spec.getPodTemplate(), + spec.getJobManager().getPodTemplate()))); } } @@ -141,7 +150,10 @@ public class FlinkUtils { if (spec.getTaskManager().getPodTemplate() != null) { effectiveConfig.set( KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE, - createTempFile(spec.getTaskManager().getPodTemplate())); + createTempFile( + mergePodTemplates( + spec.getPodTemplate(), + spec.getTaskManager().getPodTemplate()))); } } @@ -169,6 +181,49 @@ public class FlinkUtils { return tmp.getAbsolutePath(); } + public static Pod mergePodTemplates(Pod toPod, Pod fromPod) { + if (fromPod == null) { + return toPod; + } else if (toPod == null) { + return fromPod; + } + JsonNode node1 = MAPPER.valueToTree(toPod); + JsonNode node2 = MAPPER.valueToTree(fromPod); + mergeInto(node1, node2); + try { + return MAPPER.treeToValue(node1, Pod.class); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + private static void mergeInto(JsonNode toNode, JsonNode fromNode) { + Iterator<String> fieldNames = fromNode.fieldNames(); + while (fieldNames.hasNext()) { + String fieldName = fieldNames.next(); + JsonNode toChildNode = toNode.get(fieldName); + JsonNode fromChildNode = fromNode.get(fieldName); + + if (toChildNode != null && toChildNode.isArray() && fromChildNode.isArray()) { + // TODO: does merging arrays even make sense or should it just override? + for (int i = 0; i < fromChildNode.size(); i++) { + JsonNode updatedChildNode = fromChildNode.get(i); + if (toChildNode.size() <= i) { + // append new node + ((ArrayNode) toChildNode).add(updatedChildNode); + } + mergeInto(toChildNode.get(i), updatedChildNode); + } + } else if (toChildNode != null && toChildNode.isObject()) { + mergeInto(toChildNode, fromChildNode); + } else { + if (toNode instanceof ObjectNode) { + ((ObjectNode) toNode).replace(fieldName, fromChildNode); + } + } + } + } + public static ClusterClient<String> getRestClusterClient(Configuration config) throws Exception { final String clusterId = config.get(KubernetesConfigOptions.CLUSTER_ID); diff --git a/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java b/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java new file mode 100644 index 0000000..cf95292 --- /dev/null +++ b/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.utils; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodSpec; +import org.junit.Assert; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; + +/** FlinkUtilsTest. */ +public class FlinkUtilsTest { + + @Test + public void testMergePods() throws Exception { + + Container container1 = new Container(); + container1.setName("container1"); + Container container2 = new Container(); + container2.setName("container2"); + + PodSpec podSpec1 = new PodSpec(); + podSpec1.setHostname("pod1 hostname"); + podSpec1.setContainers(Arrays.asList(container2)); + Pod pod1 = new Pod(); + pod1.setApiVersion("pod1 api version"); + pod1.setSpec(podSpec1); + + PodSpec podSpec2 = new PodSpec(); + podSpec2.setHostname("pod2 hostname"); + podSpec2.setContainers(Arrays.asList(container1, container2)); + Pod pod2 = new Pod(); + pod2.setApiVersion("pod2 api version"); + pod2.setSpec(podSpec2); + + ObjectMapper mapper = new ObjectMapper(); + JsonNode node1 = mapper.valueToTree(pod1); + JsonNode node2 = mapper.valueToTree(pod2); + + Pod mergedPod = FlinkUtils.mergePodTemplates(pod1, pod2); + + Assert.assertEquals(pod2.getApiVersion(), mergedPod.getApiVersion()); + Assert.assertEquals(pod2.getSpec().getContainers(), mergedPod.getSpec().getContainers()); + } +}