This is an automated email from the ASF dual-hosted git repository.
nicknezis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new bded8f5 Added support for adding Kubernetes annotations to the
topology pod and service (#3699)
bded8f5 is described below
commit bded8f5f48199a0d38f5808604fd6e387a7a2d22
Author: Nicholas Nezis <[email protected]>
AuthorDate: Thu Jul 8 09:00:41 2021 -0400
Added support for adding Kubernetes annotations to the topology pod and
service (#3699)
---
.../scheduler/kubernetes/KubernetesContext.java | 44 +++++++++++++++++++++-
.../heron/scheduler/kubernetes/V1Controller.java | 30 +++++++++++----
2 files changed, 66 insertions(+), 8 deletions(-)
diff --git
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
index f938cd0..f6359b8 100644
---
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
+++
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
@@ -19,6 +19,11 @@
package org.apache.heron.scheduler.kubernetes;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
import org.apache.heron.spi.common.Config;
import org.apache.heron.spi.common.Context;
@@ -44,7 +49,7 @@ public final class KubernetesContext extends Context {
* provided in the Resource Limit. This mode effectively guarantees the
* cpu and memory will be reserved.
*/
- EQUAL_TO_LIMIT;
+ EQUAL_TO_LIMIT
}
/**
* This config item is used to determine how to configure the K8s Resource
Request.
@@ -83,6 +88,11 @@ public final class KubernetesContext extends Context {
public static final String HERON_KUBERNETES_CONTAINER_VOLUME_MOUNT_PATH =
"heron.kubernetes.container.volumeMount.path";
+ public static final String HERON_KUBERNETES_POD_ANNOTATION =
+ "heron.kubernetes.pod.annotation.";
+ public static final String HERON_KUBERNETES_SERVICE_ANNOTATION =
+ "heron.kubernetes.service.annotation.";
+
private KubernetesContext() {
}
@@ -152,6 +162,38 @@ public final class KubernetesContext extends Context {
return config.getStringValue(HERON_KUBERNETES_CONTAINER_VOLUME_MOUNT_PATH);
}
+ static Set<String> getConfigKeys(Config config, String keyPrefix) {
+ Set<String> annotations = new HashSet<>();
+ for (String s : config.getKeySet()) {
+ if (s.startsWith(keyPrefix)) {
+ annotations.add(s);
+ }
+ }
+ return annotations;
+ }
+
+ public static Map<String, String> getPodAnnotations(Config config) {
+ final Map<String, String> annotations = new HashMap<>();
+ final Set<String> keys = getConfigKeys(config,
HERON_KUBERNETES_POD_ANNOTATION);
+ for (String s : keys) {
+ String value = config.getStringValue(s);
+
annotations.put(s.replaceFirst(KubernetesContext.HERON_KUBERNETES_POD_ANNOTATION,
+ ""), value);
+ }
+ return annotations;
+ }
+
+ public static Map<String, String> getServiceAnnotations(Config config) {
+ final Map<String, String> annotations = new HashMap<>();
+ final Set<String> keys = getConfigKeys(config,
HERON_KUBERNETES_SERVICE_ANNOTATION);
+ for (String s : keys) {
+ String value = config.getStringValue(s);
+
annotations.put(s.replaceFirst(KubernetesContext.HERON_KUBERNETES_SERVICE_ANNOTATION,
+ ""), value);
+ }
+ return annotations;
+ }
+
public static boolean hasContainerVolume(Config config) {
final String name = getContainerVolumeName(config);
final String path = getContainerVolumeMountPath(config);
diff --git
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
index 2056372..59399b6 100644
---
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
+++
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
@@ -104,7 +104,7 @@ public class V1Controller extends KubernetesController {
final Resource containerResource = getContainerResource(packingPlan);
- final V1Service topologyService = createTopologyyService();
+ final V1Service topologyService = createTopologyService();
try {
final V1Service response =
coreClient.createNamespacedService(getNamespace(), topologyService,
null,
@@ -163,7 +163,7 @@ public class V1Controller extends KubernetesController {
final int newContainerCount = currentContainerCount +
containersToAdd.size();
try {
- patchStatefulsetReplicas(newContainerCount);
+ patchStatefulSetReplicas(newContainerCount);
} catch (ApiException ae) {
throw new TopologyRuntimeManagementException(
ae.getMessage() + "\ndetails\n" + ae.getResponseBody());
@@ -185,14 +185,14 @@ public class V1Controller extends KubernetesController {
final int newContainerCount = currentContainerCount -
containersToRemove.size();
try {
- patchStatefulsetReplicas(newContainerCount);
+ patchStatefulSetReplicas(newContainerCount);
} catch (ApiException e) {
throw new TopologyRuntimeManagementException(
e.getMessage() + "\ndetails\n" + e.getResponseBody());
}
}
- private void patchStatefulsetReplicas(int replicas) throws ApiException {
+ private void patchStatefulSetReplicas(int replicas) throws ApiException {
final String body =
String.format(JSON_PATCH_STATEFUL_SET_REPLICAS_FORMAT,
replicas);
@@ -317,7 +317,7 @@ public class V1Controller extends KubernetesController {
return String.format("%s=${POD_NAME##*-} && echo shardId=${%s}",
ENV_SHARD_ID, ENV_SHARD_ID);
}
- private V1Service createTopologyyService() {
+ private V1Service createTopologyService() {
final String topologyName = getTopologyName();
final Config runtimeConfiguration = getRuntimeConfiguration();
@@ -326,6 +326,7 @@ public class V1Controller extends KubernetesController {
// setup service metadata
final V1ObjectMeta objectMeta = new V1ObjectMeta();
objectMeta.name(topologyName);
+ objectMeta.annotations(getServiceAnnotations());
service.setMetadata(objectMeta);
// create the headless service
@@ -370,7 +371,10 @@ public class V1Controller extends KubernetesController {
// set up pod meta
final V1ObjectMeta templateMetaData = new
V1ObjectMeta().labels(getLabels(topologyName));
- templateMetaData.annotations(getPrometheusAnnotations());
+ Map<String, String> annotations = new HashMap<>();
+ annotations.putAll(getPodAnnotations());
+ annotations.putAll(getPrometheusAnnotations());
+ templateMetaData.annotations(annotations);
podTemplateSpec.setMetadata(templateMetaData);
final List<String> command = getExecutorCommand("$" + ENV_SHARD_ID);
@@ -383,6 +387,18 @@ public class V1Controller extends KubernetesController {
return statefulSet;
}
+ private Map<String, String> getPodAnnotations() {
+ Config config = getConfiguration();
+ final Map<String, String> annotations =
KubernetesContext.getPodAnnotations(config);
+ return annotations;
+ }
+
+ private Map<String, String> getServiceAnnotations() {
+ Config config = getConfiguration();
+ final Map<String, String> annotations =
KubernetesContext.getServiceAnnotations(config);
+ return annotations;
+ }
+
private Map<String, String> getPrometheusAnnotations() {
final Map<String, String> annotations = new HashMap<>();
annotations.put(KubernetesConstants.ANNOTATION_PROMETHEUS_SCRAPE, "true");
@@ -529,7 +545,7 @@ public class V1Controller extends KubernetesController {
if (remoteDebugEnabled) {
IntStream.range(0, numberOfInstances).forEach(i -> {
final String portName =
- KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT_NAME + "-" +
String.valueOf(i);
+ KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT_NAME + "-" + i;
final V1ContainerPort port = new V1ContainerPort();
port.setName(portName);
port.setContainerPort(KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT +
i);