This is an automated email from the ASF dual-hosted git repository. nicknezis pushed a commit to branch nicknezis/k8s-labels in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
commit 87d49a704b13f3a80c9a46e00b9dd5cf61fc933d Author: Nicholas Nezis <[email protected]> AuthorDate: Wed Jul 7 21:52:41 2021 -0400 Add support for dynamic k8s labels --- .../scheduler/kubernetes/KubernetesContext.java | 43 +++++++++++++--------- .../heron/scheduler/kubernetes/V1Controller.java | 16 +++++--- 2 files changed, 37 insertions(+), 22 deletions(-) diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java index f6359b8..b87b267 100644 --- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java +++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java @@ -92,6 +92,11 @@ public final class KubernetesContext extends Context { "heron.kubernetes.pod.annotation."; public static final String HERON_KUBERNETES_SERVICE_ANNOTATION = "heron.kubernetes.service.annotation."; + public static final String HERON_KUBERNETES_POD_LABEL = + "heron.kubernetes.pod.annotation."; + public static final String HERON_KUBERNETES_SERVICE_LABEL = + "heron.kubernetes.service.annotation."; + private KubernetesContext() { } @@ -162,6 +167,22 @@ public final class KubernetesContext extends Context { return config.getStringValue(HERON_KUBERNETES_CONTAINER_VOLUME_MOUNT_PATH); } + public static Map<String, String> getPodLabels(Config config) { + return getConfigItemsByPrefix(config, HERON_KUBERNETES_POD_LABEL); + } + + public static Map<String, String> getServiceLabels(Config config) { + return getConfigItemsByPrefix(config, HERON_KUBERNETES_SERVICE_LABEL); + } + + public static Map<String, String> getPodAnnotations(Config config) { + return getConfigItemsByPrefix(config, HERON_KUBERNETES_POD_ANNOTATION); + } + + public static Map<String, String> getServiceAnnotations(Config config) { + return getConfigItemsByPrefix(config, HERON_KUBERNETES_SERVICE_ANNOTATION); + } + static Set<String> getConfigKeys(Config config, String keyPrefix) { Set<String> annotations = new HashSet<>(); for (String s : config.getKeySet()) { @@ -172,26 +193,14 @@ public final class KubernetesContext extends Context { return annotations; } - public static Map<String, String> getPodAnnotations(Config config) { - final Map<String, String> annotations = new HashMap<>(); - final Set<String> keys = getConfigKeys(config, HERON_KUBERNETES_POD_ANNOTATION); - for (String s : keys) { - String value = config.getStringValue(s); - annotations.put(s.replaceFirst(KubernetesContext.HERON_KUBERNETES_POD_ANNOTATION, - ""), value); - } - return annotations; - } - - public static Map<String, String> getServiceAnnotations(Config config) { - final Map<String, String> annotations = new HashMap<>(); - final Set<String> keys = getConfigKeys(config, HERON_KUBERNETES_SERVICE_ANNOTATION); + private static Map<String, String> getConfigItemsByPrefix(Config config, String keyPrefix) { + final Map<String, String> results = new HashMap<>(); + final Set<String> keys = getConfigKeys(config, keyPrefix); for (String s : keys) { String value = config.getStringValue(s); - annotations.put(s.replaceFirst(KubernetesContext.HERON_KUBERNETES_SERVICE_ANNOTATION, - ""), value); + results.put(s.replaceFirst(keyPrefix, ""), value); } - return annotations; + return results; } public static boolean hasContainerVolume(Config config) { diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java index 59399b6..2bf815b 100644 --- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java +++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java @@ -327,12 +327,13 @@ public class V1Controller extends KubernetesController { final V1ObjectMeta objectMeta = new V1ObjectMeta(); objectMeta.name(topologyName); objectMeta.annotations(getServiceAnnotations()); + objectMeta.setLabels(getServiceLabels()); service.setMetadata(objectMeta); // create the headless service final V1ServiceSpec serviceSpec = new V1ServiceSpec(); serviceSpec.clusterIP("None"); - serviceSpec.setSelector(getMatchLabels(topologyName)); + serviceSpec.setSelector(getPodMatchLabels(topologyName)); service.setSpec(serviceSpec); @@ -363,14 +364,14 @@ public class V1Controller extends KubernetesController { // add selector match labels "app=heron" and "topology=topology-name" // so the we know which pods to manage final V1LabelSelector selector = new V1LabelSelector(); - selector.matchLabels(getMatchLabels(topologyName)); + selector.matchLabels(getPodMatchLabels(topologyName)); statefulSetSpec.selector(selector); // create a pod template final V1PodTemplateSpec podTemplateSpec = new V1PodTemplateSpec(); // set up pod meta - final V1ObjectMeta templateMetaData = new V1ObjectMeta().labels(getLabels(topologyName)); + final V1ObjectMeta templateMetaData = new V1ObjectMeta().labels(getPodLabels(topologyName)); Map<String, String> annotations = new HashMap<>(); annotations.putAll(getPodAnnotations()); annotations.putAll(getPrometheusAnnotations()); @@ -408,20 +409,25 @@ public class V1Controller extends KubernetesController { return annotations; } - private Map<String, String> getMatchLabels(String topologyName) { + private Map<String, String> getPodMatchLabels(String topologyName) { final Map<String, String> labels = new HashMap<>(); labels.put(KubernetesConstants.LABEL_APP, KubernetesConstants.LABEL_APP_VALUE); labels.put(KubernetesConstants.LABEL_TOPOLOGY, topologyName); return labels; } - private Map<String, String> getLabels(String topologyName) { + private Map<String, String> getPodLabels(String topologyName) { final Map<String, String> labels = new HashMap<>(); labels.put(KubernetesConstants.LABEL_APP, KubernetesConstants.LABEL_APP_VALUE); labels.put(KubernetesConstants.LABEL_TOPOLOGY, topologyName); + labels.putAll(KubernetesContext.getPodLabels(getConfiguration())); return labels; } + private Map<String, String> getServiceLabels() { + return KubernetesContext.getServiceLabels(getConfiguration()); + } + private V1PodSpec getPodSpec(List<String> executorCommand, Resource resource, int numberOfInstances) { final V1PodSpec podSpec = new V1PodSpec();
