This is an automated email from the ASF dual-hosted git repository.
joshfischer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 837c4f2 [HERON-3707] ConfigMap Pod Template Support (#3710)
837c4f2 is described below
commit 837c4f2e15a0759abfa7061728d6f5d1ba98151a
Author: Saad Ur Rahman <[email protected]>
AuthorDate: Tue Nov 2 09:40:39 2021 -0400
[HERON-3707] ConfigMap Pod Template Support (#3710)
* [Kubernetes] setup basic mount info for Pod ConfigMap.
* [Kubernetes] updated function signature to handle Pod Template ConfigMap
name.
* [kubernetes] extracting Pod Template ConfigMap name from <Config>.
* [kubernetes] checking for Pod Template ConfigMap and appropriately adding
to Stateful Set.
* [kubernetes] Java Style lint fix.
* [Tests] Kubernetes Controller tests for Pod Template ConfigMap.
* [Tests] Kubernetes Constants and Context tests for Pod Template ConfigMap.
* [Tests] Kubernetes V1Controller test suite stubbed.
* [Tests] Java style lint fixed.
* [Tests] Kubernetes V1Controller Pod Template ConfigMap volume mount.
* [Kubernetes] cleaned up to begin work on <loadPodFromTemplate>.
* [Kubernetes] created <loadPodFromTemplate>.
* [Tests] Begun mock test setup of <loadPodFromTemplate>.
* [Kubernetes] style check/linting fix.
* [Kubernetes] refactoring <V1Controller> and <KubernetesController>.
* [Kubernetes] added description to failed to locate exception.
* [Tests] <loadPodFromTemplate> Pod Template checks.
* [Kubernetes] check for no ConfigMaps set.
* [Tests] working on mocking null list of V1ConfigMapList.
* [Kubernetes] refactoring <loadPodFromTemplate>
Adding checks for null pointers. Default constructed V1 objects tend to
have uninitialised fields set to null by default. Extracting <getConfigMaps> to
method to support mocking.
* [Tests] Stubbed <getConfigMaps> and testing <loadPodFromTemplate>.
* [Kubernetes] <loadPodFromTemplate> adjusted to get <V1PodTemplateSpec>
from <V1PodTemplate>.
* [Kubernetes] check for empty Pod Template in ConfigMap.
* [Tests] Valid Pod Template test.
* [Tests] Invalid Pod Template.
* [Tests] refactored test data to their respective tests.
* [Kubernetes] refactored <loadPodFromTemplate> for readability.
* [Kubernetes] params for <getConfigMaps> tweaked.
Judging from
<release-11.0.0/kubernetes/src/main/java/io/kubernetes/client/openapi/apis/CoreV1Api.java>
"optional" means the field can be set to <null>.
* [Kubernetes] <getPodTemplateLocation> extracting ConfigMap and Pod
Template names.
* [Tests] <getPodTemplateLocation> for correct and incorrect parsing.
* [Kubernetes] <getPodTemplateLocation> catching empty names.
* [Tests] <getPodTemplateLocation> separated tests for ConfigMap and Pod
Template names.
* [Kubernetes] updated <loadPodFromTemplate> to use ConfigMap and Pod
Template names.
* [Tests] updated tests for <loadPodFromTemplate> to use ConfigMap and Pod
Template names.
* [Kubernetes] added INFO logging to <loadPodFromTemplate> for the deployed
Pod Template.
* [Kubernetes] Bug fixes in error messages for <loadPodFromTemplate>.
* [Kubernetes] bug fix and test for missing delimiter in
<getPodTemplateLocation>.
* [Kubernetes] <getConfigMaps> namespace access updated.
* [Kubernetes] <configureRBAC> basic logic.
TODO: get API key for K8s.
* [Tests] cleaned up <V1Controller> tests.
* [Kubernetes] <configureRBAC> more detailed error log.
* [Kubernetes] <configureRBAC> role configurations.
* [Kubernetes] refactored <loadFromPodTemplate>.
* [Tests] switched to <ConfigMapBuilder> in <V1ControllerTest>.
* [Kubernetes] switched to <V1RoleBuilder> in <configureRBAC>.
* [Kubernetes] made <loadPodFromTemplate> protected.
Removed illegal reflection access to avoid support issues with newer
testing frameworks.
* [Kubernetes] removed <configureRBAC>.
RBAC must be configured using Role/ClusterRole and
RoleBinding/ClusterRoleBinding to the Heron <heron-apiserver> ServiceAccount.
* [Kubernetes] <getPodTemplateLocation> error message passed up.
* [Kubernetes] refactored <getPodSpec> to <finalizePodSpec>.
Heron should have the final say on the Pod Spec. This is as much a point of
security as an operational one.
* [Kubernetes] Added boot flag to disable Pod Templates.
* [Tests] testing to validate boot flag for disabled Pod Templates.
* [Kubernetes] Wiring in boot flag to disable Pod Templates in
<loadPodFromTemplate>.
* [Tests] disabled Pod Templates output validation.
* [Kubernetes] Added class scoped variable <isPodTemplateDisabled>.
* [Kubernetes] <getContainer> modified to utilise supplied executor
container.
* [Kubernetes] <getContainer> <V1EnvVar>s.
Environment variables merged with Heron defaults taking precedence.
* [Kubernetes] <getContainer> Limits.
Resource Limits merged with Heron defaults taking precedence.
* [Kubernetes] disabled Pod Templates will return error when attempting to
submit.
* [Kubernetes] <API Server> configs.
Updated RBAC API version and added a commented flag command to disable to
Pod Templates.
* [Kubernetes] <configureExecutorContainer>
Refactored <getContainer> to <configureExecutorContainer>. Permitting
additional containers for side-car purposes.
* [Kubernetes] <configureExecutorContainer>
Switched to <TreeSet> with custom comparator for <V1EnvVar> name.
<V1EnvVar>'s comparator performs a complete element wise comparison.
* [Kubernetes] <configureExecutorContainer>
Merged executor container ports with ports provided in Pod Template. Heron
defaults take precedence.
* [Kubernetes] <mountVolumeIfPresent>
Merge volume mounts with those provided in Pod Template. Heron defaults
take precedence.
* [Kubernetes] <V1Controller>
general cleanup of new code and comments.
* [Kubernetes] <mountVolumeIfPresent>
Error check for malformed Pod Template.
* [Kubernetes] <configureContainerPorts>
Refactored <getContainerPorts> and moved port merge with error handling to
it.
* [Kubernetes] <configureExecutorContainer>
Removed a redundant <limit> put into the HashMap.
* [Kubernetes] <addVolumesIfPresent>
Merging Pod Template volume configs with Heron defaults. Heron values take
precedence.
* [Kubernetes] <configureExecutorContainer>
Allow user values for CPU and MEMORY limits to override those provided by
Heron.
* [Scheduler-Core] <LaunchRunner> handling <submit> errors better.
Some Schedulers, such as K8s, throw exceptions instead of returning false
when <submit> fails. This leaves the Topology Manager with dangling references.
An addition RPC call to the Scheduler is required to completely clear the state.
* [Kubernetes] <V1Controller>
General cleanup in tests and class.
* [Kubernetes] code review changes.
Code review from @nwangtw.
<KubernetesContext.getPodTemplateConfigMapDisabled> switched to
<equalsIgnoreCase>.
* [Scheduler-Core] code review changes.
Code review from @nwangtw.
<LaunchRunner> error message assembly improved.
<LaunchRunner> added <FINE> level logging for failure to clear failed
topology launch from Scheduler.
* [Tests] <configureContainerPorts>.
* [Kubernetes] <API Server> configs.
Code review from @nwangtw, @nicknezis.
Updated command to disable Pod Templates to <false> by default.
* [Kubernetes] <configureContainerEnvVars>
Logic for merging environment variables extracted to a method for testing.
* [Tests] <configureContainerEnvVars>.
* [Kubernetes] <configureExecutorContainer>
Wired in <configureContainerEnvVars> and removed old code.
* Update for Helm chart
* Updated version to match the other k8s ClusterRoleBindings
* [Kubernetes] <configureContainerResources>
Logic to configure container's resources extracted to method to facilitate
testing.
* [Kubernetes] <configureExecutorContainer>
Removed old logic and wired <configureContainerResources> into
<configureExecutorContainer>.
* [Tests] <testConfigureContainerPorts>.
Added a test for debugging ports.
* [Kubernetes] <addVolumesIfPresent>.
Exposed for testing.
* [Tests] <addVolumesIfPresent>.
Testing on a <hostPath> volume but will generalise across others.
* [Kubernetes] <mountVolumeIfPresent>.
Exposed for testing.
* [Tests] <mountVolumeIfPresent>.
Tested by setting a Volume Mount in the Config and then a custom Volume
Mount in the container.
* [Tests] <addVolumesIfPresent>.
Cleaned up tests.
* [Tests] <mountVolumesIfPresent>.
Testing for when no Volume Mounts should be set.
* [Tests] <addVolumesIfPresent>.
Testing for when no Volumes should be set.
* Attempt to fix Travis CI build
* [Tests] <configureContainerEnvVars> <configureContainerPorts>.
Extracted logic to generate executor environment variables, ports, and
debugging ports. This is to resolve production-testing code inconsistencies
which may arise.
* [Tests] <V1ControllerTests>
General cleanup and simplification of test suite.
* Travis fix take 3
* Travis CI fix
* [Kubernetes] <V1ControllerUtils>
Added nested utility class to improve code maintainability.
<mergeListDedupe> will merge two input lists by keeping all values in one
and deduplicating the second list.
* [Tests] <mergeListDedupe>.
Full battery of tests null lists, merged lists, and thrown errors.
* [Kubernetes] <V1Controller>.
Switched to using <mergeListsDedupe> to improve code maintainability.
Effects:
<addVolumesIfPresent>
<configureContainerEnvVars>
<configureContainerPorts>
<mountVolumeIfPresent>
* [Kubernetes] <V1Controller> cleaned up unneeded returns when using setter
methods.
* [Kubernetes] <V1Controller>.
Merging Pod Specification Tolerations and deduplicating on the
<V1Tolerations::key>.
* [Tests] <configureTolerations>.
Test for a null, empty, and merging of Toleration lists.
* [Kubernetes] <configurePodSpec>.
Wired in <configureTolerations>
* [Tests] cleaning up code.
* [Kubernetes] <configurePodSpec>.
Added check for multiple executor container specs in Pod Template. Will
throw error if detected.
* [Tests] <V1Controller> general cleanup.
* [Kubernetes] Constants
Updated tolerations to remove deprecated taints.
* [Kubernetes] <V1Controller>
<getConfigMap> retrieving a single named ConfigMap in a specific namespace.
<loadPodFromTemplate> logic updated to handle a single ConfigMap.
* [Tests] <V1Controller>
Fixed and cleaned up tests after switching to <readNamespacedConfigMap>.
* [Kubernetes] <V1Controller>
Error message cleanup.
* [Tests] <V1Controller>
Test description cleanup.
* [Kubernetes] <KubernetesUtils>
Javadoc cleanup.
* [Tests] <KubernetesUtils>
Test description cleanup.
* [Tests] <V1Controller>
<configureContainerResources> Heron values take precedence for limits.
* [Kubernetes] <V1Controller>
<configureContainerResources> Heron values take precedence for limits.
* Add support for reading configmap
* Removed deprecated k8s tolerations
Co-authored-by: Nicholas Nezis <[email protected]>
---
.travis.yml | 9 +-
deploy/kubernetes/general/apiserver.yaml | 9 +-
deploy/kubernetes/general/tools.yaml | 6 +-
deploy/kubernetes/gke/gcs-apiserver.yaml | 8 +-
deploy/kubernetes/helm/templates/tools.yaml | 14 +-
deploy/kubernetes/helm/values.yaml.template | 3 +
deploy/kubernetes/minikube/apiserver.yaml | 3 +-
.../org/apache/heron/scheduler/LaunchRunner.java | 47 +-
.../scheduler/kubernetes/KubernetesConstants.java | 5 +-
.../scheduler/kubernetes/KubernetesContext.java | 15 +
.../scheduler/kubernetes/KubernetesUtils.java | 40 +-
.../heron/scheduler/kubernetes/V1Controller.java | 398 ++++++++---
heron/schedulers/tests/java/BUILD | 7 +-
.../kubernetes/KubernetesContextTest.java | 62 ++
.../scheduler/kubernetes/KubernetesUtilsTest.java | 119 ++++
.../scheduler/kubernetes/V1ControllerTest.java | 740 +++++++++++++++++++++
16 files changed, 1352 insertions(+), 133 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index e608fa8..532a335 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -18,6 +18,9 @@ addons:
- libcppunit-dev
- pkg-config
- python3-dev
+ - python3-pip
+ - python3-setuptools
+ - python3-wheel
- python3-venv
- wget
- zip
@@ -34,10 +37,6 @@ before_install:
- chmod +x bazel-${BAZEL_VERSION}-installer-linux-x86_64.sh
- ./bazel-${BAZEL_VERSION}-installer-linux-x86_64.sh --user
-install:
- - sudo apt-get install python3-pip python3-setuptools
- - pip3 install travis-wait-improved
-
script:
- which gcc
- gcc --version
@@ -47,4 +46,4 @@ script:
- python -V
- which python3
- python3 -V
- - travis-wait-improved --timeout=180m scripts/travis/ci.sh
+ - scripts/travis/ci.sh
diff --git a/deploy/kubernetes/general/apiserver.yaml
b/deploy/kubernetes/general/apiserver.yaml
index aa5b93e..33c9533 100644
--- a/deploy/kubernetes/general/apiserver.yaml
+++ b/deploy/kubernetes/general/apiserver.yaml
@@ -27,7 +27,7 @@ metadata:
namespace: default
---
-apiVersion: rbac.authorization.k8s.io/v1beta1
+apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: heron-apiserver
@@ -67,11 +67,7 @@ spec:
operator: "Equal"
effect: "NoExecute"
tolerationSeconds: 10
- - key: "node.alpha.kubernetes.io/notReady"
- operator: "Equal"
- effect: "NoExecute"
- tolerationSeconds: 10
- - key: "node.alpha.kubernetes.io/unreachable"
+ - key: "node.kubernetes.io/unreachable"
operator: "Equal"
effect: "NoExecute"
tolerationSeconds: 10
@@ -95,6 +91,7 @@ spec:
-D
heron.uploader.dlog.topologies.namespace.uri=distributedlog://zookeeper:2181/heron
-D
heron.statefulstorage.classname=org.apache.heron.statefulstorage.dlog.DlogStorage
-D
heron.statefulstorage.dlog.namespace.uri=distributedlog://zookeeper:2181/heron
+ -D heron.kubernetes.pod.template.configmap.disabled=false
---
apiVersion: v1
diff --git a/deploy/kubernetes/general/tools.yaml
b/deploy/kubernetes/general/tools.yaml
index b650e04..d60790a 100644
--- a/deploy/kubernetes/general/tools.yaml
+++ b/deploy/kubernetes/general/tools.yaml
@@ -38,11 +38,7 @@ spec:
operator: "Equal"
effect: "NoExecute"
tolerationSeconds: 10
- - key: "node.alpha.kubernetes.io/notReady"
- operator: "Equal"
- effect: "NoExecute"
- tolerationSeconds: 10
- - key: "node.alpha.kubernetes.io/unreachable"
+ - key: "node.kubernetes.io/unreachable"
operator: "Equal"
effect: "NoExecute"
tolerationSeconds: 10
diff --git a/deploy/kubernetes/gke/gcs-apiserver.yaml
b/deploy/kubernetes/gke/gcs-apiserver.yaml
index 2a84be1..3c98674 100644
--- a/deploy/kubernetes/gke/gcs-apiserver.yaml
+++ b/deploy/kubernetes/gke/gcs-apiserver.yaml
@@ -27,7 +27,7 @@ metadata:
namespace: default
---
-apiVersion: rbac.authorization.k8s.io/v1beta1
+apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: heron-apiserver
@@ -67,11 +67,7 @@ spec:
operator: "Equal"
effect: "NoExecute"
tolerationSeconds: 10
- - key: "node.alpha.kubernetes.io/notReady"
- operator: "Equal"
- effect: "NoExecute"
- tolerationSeconds: 10
- - key: "node.alpha.kubernetes.io/unreachable"
+ - key: "node.kubernetes.io/unreachable"
operator: "Equal"
effect: "NoExecute"
tolerationSeconds: 10
diff --git a/deploy/kubernetes/helm/templates/tools.yaml
b/deploy/kubernetes/helm/templates/tools.yaml
index 0a6fe15..123f0e2 100644
--- a/deploy/kubernetes/helm/templates/tools.yaml
+++ b/deploy/kubernetes/helm/templates/tools.yaml
@@ -56,11 +56,7 @@ spec:
operator: "Equal"
effect: "NoExecute"
tolerationSeconds: 10
- - key: "node.alpha.kubernetes.io/notReady"
- operator: "Equal"
- effect: "NoExecute"
- tolerationSeconds: 10
- - key: "node.alpha.kubernetes.io/unreachable"
+ - key: "node.kubernetes.io/unreachable"
operator: "Equal"
effect: "NoExecute"
tolerationSeconds: 10
@@ -162,6 +158,7 @@ spec:
-D
heron.class.repacking.algorithm=org.apache.heron.packing.binpacking.FirstFitDecreasingPacking
{{- end }}
-D heron.kubernetes.resource.request.mode={{
.Values.topologyResourceRequestMode }}
+ -D heron.kubernetes.pod.template.configmap.disabled={{
.Values.disablePodTemplates }}
envFrom:
- configMapRef:
name: {{ .Release.Name }}-tools-config
@@ -265,6 +262,13 @@ rules:
- patch
- update
- watch
+- apiGroups:
+ - ""
+ resources:
+ - configmaps
+ verbs:
+ - get
+ - list
---
apiVersion: v1
diff --git a/deploy/kubernetes/helm/values.yaml.template
b/deploy/kubernetes/helm/values.yaml.template
index cf0154d..61daf04 100644
--- a/deploy/kubernetes/helm/values.yaml.template
+++ b/deploy/kubernetes/helm/values.yaml.template
@@ -58,6 +58,9 @@ uploader:
# Packing algorithms
packing: RoundRobin # ResourceCompliantRR, FirstFitDecreasing
+# Support for ConfigMap mounted PodTemplates
+disablePodTemplates: false
+
# Number of replicas for storage bookies, memory and storage requirements
bookieReplicas: 3
bookieCpuMin: 100m
diff --git a/deploy/kubernetes/minikube/apiserver.yaml
b/deploy/kubernetes/minikube/apiserver.yaml
index 2cbf363..8c08cc9 100644
--- a/deploy/kubernetes/minikube/apiserver.yaml
+++ b/deploy/kubernetes/minikube/apiserver.yaml
@@ -28,7 +28,7 @@ metadata:
namespace: default
---
-apiVersion: rbac.authorization.k8s.io/v1beta1
+apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: heron-apiserver
@@ -82,6 +82,7 @@ spec:
-D
heron.uploader.dlog.topologies.namespace.uri=distributedlog://zookeeper:2181/heronbkdl
-D
heron.statefulstorage.classname=org.apache.heron.statefulstorage.dlog.DlogStorage
-D
heron.statefulstorage.dlog.namespace.uri=distributedlog://zookeeper:2181/heronbkdl
+ -D heron.kubernetes.pod.template.configmap.disabled=false
---
apiVersion: v1
diff --git
a/heron/scheduler-core/src/java/org/apache/heron/scheduler/LaunchRunner.java
b/heron/scheduler-core/src/java/org/apache/heron/scheduler/LaunchRunner.java
index e937443..1b78667 100644
--- a/heron/scheduler-core/src/java/org/apache/heron/scheduler/LaunchRunner.java
+++ b/heron/scheduler-core/src/java/org/apache/heron/scheduler/LaunchRunner.java
@@ -19,11 +19,15 @@
package org.apache.heron.scheduler;
+import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.heron.api.generated.TopologyAPI;
+import org.apache.heron.proto.scheduler.Scheduler;
import org.apache.heron.proto.system.ExecutionEnvironment;
import org.apache.heron.proto.system.PackingPlans;
+import org.apache.heron.scheduler.client.ISchedulerClient;
+import org.apache.heron.scheduler.client.SchedulerClientFactory;
import org.apache.heron.scheduler.dryrun.SubmitDryRunResponse;
import org.apache.heron.scheduler.utils.LauncherUtils;
import org.apache.heron.scheduler.utils.Runtime;
@@ -169,13 +173,48 @@ public class LaunchRunner {
"Failed to set execution state for topology '%s'", topologyName));
}
- // launch the topology, clear the state if it fails
- if (!launcher.launch(packedPlan)) {
+ // Launch the topology, clear the state if it fails. Some schedulers throw
exceptions instead of
+ // returning false. In some cases the scheduler needs to have the topology
deleted.
+ try {
+ if (!launcher.launch(packedPlan)) {
+ throw new TopologySubmissionException(null);
+ }
+ } catch (TopologySubmissionException e) {
+ // Compile error message to throw.
+ final StringBuilder errorMessage = new StringBuilder(
+ String.format("Failed to launch topology '%s'", topologyName));
+ if (e.getMessage() != null) {
+ errorMessage.append("\n").append(e.getMessage());
+ }
+
+ try {
+ // Clear state from the Scheduler via RPC.
+ Scheduler.KillTopologyRequest killTopologyRequest =
Scheduler.KillTopologyRequest
+ .newBuilder()
+ .setTopologyName(topologyName).build();
+
+ ISchedulerClient schedulerClient = new SchedulerClientFactory(config,
runtime)
+ .getSchedulerClient();
+ if (!schedulerClient.killTopology(killTopologyRequest)) {
+ final String logMessage =
+ String.format("Failed to remove topology '%s' from scheduler
after failed submit. "
+ + "Please re-try the kill command.", topologyName);
+ errorMessage.append("\n").append(logMessage);
+ LOG.log(Level.SEVERE, logMessage);
+ }
+ // SUPPRESS CHECKSTYLE IllegalCatch
+ } catch (Exception ignored){
+ // The above call to clear the Scheduler may fail. This situation can
be ignored.
+ LOG.log(Level.FINE,
+ String.format("Failure clearing failed topology `%s` from
Scheduler during `submit`",
+ topologyName));
+ }
+
+ // Clear state from the State Manager.
statemgr.deleteExecutionState(topologyName);
statemgr.deletePackingPlan(topologyName);
statemgr.deleteTopology(topologyName);
- throw new LauncherException(String.format(
- "Failed to launch topology '%s'", topologyName));
+ throw new LauncherException(errorMessage.toString());
}
}
}
diff --git
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesConstants.java
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesConstants.java
index be45918..5e7b19a 100644
---
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesConstants.java
+++
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesConstants.java
@@ -35,6 +35,8 @@ public final class KubernetesConstants {
public static final String MEMORY = "memory";
public static final String CPU = "cpu";
+ public static final String EXECUTOR_NAME = "executor";
+
// container env constants
public static final String ENV_HOST = "HOST";
public static final String POD_IP = "status.podIP";
@@ -102,8 +104,7 @@ public final class KubernetesConstants {
static final List<String> TOLERATIONS = Collections.unmodifiableList(
Arrays.asList(
"node.kubernetes.io/not-ready",
- "node.alpha.kubernetes.io/notReady",
- "node.alpha.kubernetes.io/unreachable"
+ "node.kubernetes.io/unreachable"
)
);
}
diff --git
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
index 71a660a..6d29b72 100644
---
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
+++
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
@@ -82,6 +82,12 @@ public final class KubernetesContext extends Context {
public static final String KUBERNETES_VOLUME_AWS_EBS_FS_TYPE =
"heron.kubernetes.volume.awsElasticBlockStore.fsType";
+ // pod template configmap
+ public static final String KUBERNETES_POD_TEMPLATE_CONFIGMAP_NAME =
+ "heron.kubernetes.pod.template.configmap.name";
+ public static final String KUBERNETES_POD_TEMPLATE_CONFIGMAP_DISABLED =
+ "heron.kubernetes.pod.template.configmap.disabled";
+
// container mount volume mount keys
public static final String KUBERNETES_CONTAINER_VOLUME_MOUNT_NAME =
"heron.kubernetes.container.volumeMount.name";
@@ -172,6 +178,15 @@ public final class KubernetesContext extends Context {
return config.getStringValue(KUBERNETES_CONTAINER_VOLUME_MOUNT_PATH);
}
+ public static String getPodTemplateConfigMapName(Config config) {
+ return config.getStringValue(KUBERNETES_POD_TEMPLATE_CONFIGMAP_NAME);
+ }
+
+ public static boolean getPodTemplateConfigMapDisabled(Config config) {
+ final String disabled =
config.getStringValue(KUBERNETES_POD_TEMPLATE_CONFIGMAP_DISABLED);
+ return "true".equalsIgnoreCase(disabled);
+ }
+
public static Map<String, String> getPodLabels(Config config) {
return getConfigItemsByPrefix(config, KUBERNETES_POD_LABEL_PREFIX);
}
diff --git
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesUtils.java
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesUtils.java
index 5f963fd..a75e00c 100644
---
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesUtils.java
+++
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesUtils.java
@@ -20,17 +20,22 @@
package org.apache.heron.scheduler.kubernetes;
import java.io.IOException;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.common.basics.SysUtils;
+import org.apache.heron.scheduler.TopologySubmissionException;
import org.apache.heron.scheduler.utils.Runtime;
import org.apache.heron.spi.common.Config;
import org.apache.heron.spi.common.Context;
import io.kubernetes.client.openapi.ApiException;
-
import okhttp3.Response;
final class KubernetesUtils {
@@ -84,4 +89,37 @@ final class KubernetesUtils {
static String Megabytes(ByteAmount amount) {
return String.format("%sMi", Long.toString(amount.asMegabytes()));
}
+
+ static class V1ControllerUtils<T> {
+ private static final Logger LOG =
Logger.getLogger(V1Controller.class.getName());
+
+ /**
+ * Merge two lists by keeping all values in the <code>primaryList</code>
and de-duplicating values in
+ * <code>secondaryList</code> using the <code>comparator</code>.
+ * @param primaryList All the values in this will be retained.
+ * @param secondaryList The values in this list will be deduplicated
against <code>primaryList</code>.
+ * @param comparator Used to compare keys in the <code>TreeSet</code> to
find their insertion position.
+ * @param description Description of the list merge operation which is
used for error messages.
+ * @return A de-duplicated list of all the values in both input lists
using the <code>comparator</code>.
+ */
+ protected List<T> mergeListsDedupe(List<T> primaryList, List<T>
secondaryList,
+ Comparator<T> comparator, String
description) {
+ if (primaryList == null || primaryList.isEmpty()) {
+ return secondaryList;
+ }
+ if (secondaryList == null || secondaryList.isEmpty()) {
+ return primaryList;
+ }
+ try {
+ Set<T> treeSet = new TreeSet<>(comparator);
+ treeSet.addAll(primaryList);
+ treeSet.addAll(secondaryList);
+ return new LinkedList<>(treeSet);
+ } catch (NullPointerException e) {
+ final String message = String.format("Failed to merge lists for %s",
description);
+ LOG.log(Level.FINE, message);
+ throw new TopologySubmissionException(message);
+ }
+ }
+ }
}
diff --git
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
index 747ed3c..761d615 100644
---
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
+++
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -34,7 +35,10 @@ import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import com.google.common.annotations.VisibleForTesting;
+
import org.apache.heron.api.utils.TopologyUtils;
+import org.apache.heron.common.basics.Pair;
import org.apache.heron.scheduler.TopologyRuntimeManagementException;
import org.apache.heron.scheduler.TopologySubmissionException;
import org.apache.heron.scheduler.utils.Runtime;
@@ -51,6 +55,7 @@ import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.Configuration;
import io.kubernetes.client.openapi.apis.AppsV1Api;
import io.kubernetes.client.openapi.apis.CoreV1Api;
+import io.kubernetes.client.openapi.models.V1ConfigMap;
import io.kubernetes.client.openapi.models.V1Container;
import io.kubernetes.client.openapi.models.V1ContainerPort;
import io.kubernetes.client.openapi.models.V1EnvVar;
@@ -59,6 +64,7 @@ import io.kubernetes.client.openapi.models.V1LabelSelector;
import io.kubernetes.client.openapi.models.V1ObjectFieldSelector;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.openapi.models.V1PodSpec;
+import io.kubernetes.client.openapi.models.V1PodTemplate;
import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
import io.kubernetes.client.openapi.models.V1ResourceRequirements;
import io.kubernetes.client.openapi.models.V1SecretKeySelector;
@@ -71,7 +77,7 @@ import io.kubernetes.client.openapi.models.V1Toleration;
import io.kubernetes.client.openapi.models.V1Volume;
import io.kubernetes.client.openapi.models.V1VolumeMount;
import io.kubernetes.client.util.PatchUtils;
-
+import io.kubernetes.client.util.Yaml;
import okhttp3.Response;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
@@ -83,11 +89,18 @@ public class V1Controller extends KubernetesController {
private static final String ENV_SHARD_ID = "SHARD_ID";
+ private final boolean isPodTemplateDisabled;
+
private final AppsV1Api appsClient;
private final CoreV1Api coreClient;
V1Controller(Config configuration, Config runtimeConfiguration) {
super(configuration, runtimeConfiguration);
+
+ isPodTemplateDisabled =
KubernetesContext.getPodTemplateConfigMapDisabled(configuration);
+ LOG.log(Level.WARNING, String.format("Custom Pod Templates are %s",
+ isPodTemplateDisabled ? "DISABLED" : "ENABLED"));
+
try {
final ApiClient apiClient =
io.kubernetes.client.util.Config.defaultClient();
Configuration.setDefaultApiClient(apiClient);
@@ -338,17 +351,17 @@ public class V1Controller extends KubernetesController {
final V1Service service = new V1Service();
- // setup service metadata
- final V1ObjectMeta objectMeta = new V1ObjectMeta();
- objectMeta.name(topologyName);
- objectMeta.annotations(getServiceAnnotations());
- objectMeta.setLabels(getServiceLabels());
+ // Setup service metadata.
+ final V1ObjectMeta objectMeta = new V1ObjectMeta()
+ .name(topologyName)
+ .annotations(getServiceAnnotations())
+ .labels(getServiceLabels());
service.setMetadata(objectMeta);
- // create the headless service
- final V1ServiceSpec serviceSpec = new V1ServiceSpec();
- serviceSpec.clusterIP("None");
- serviceSpec.setSelector(getPodMatchLabels(topologyName));
+ // Create the headless service.
+ final V1ServiceSpec serviceSpec = new V1ServiceSpec()
+ .clusterIP("None")
+ .selector(getPodMatchLabels(topologyName));
service.setSpec(serviceSpec);
@@ -361,44 +374,44 @@ public class V1Controller extends KubernetesController {
final V1StatefulSet statefulSet = new V1StatefulSet();
- // setup stateful set metadata
- final V1ObjectMeta objectMeta = new V1ObjectMeta();
- objectMeta.name(topologyName);
- statefulSet.metadata(objectMeta);
+ // Setup StatefulSet's metadata.
+ final V1ObjectMeta objectMeta = new V1ObjectMeta()
+ .name(topologyName);
+ statefulSet.setMetadata(objectMeta);
- // create the stateful set spec
- final V1StatefulSetSpec statefulSetSpec = new V1StatefulSetSpec();
- statefulSetSpec.serviceName(topologyName);
-
statefulSetSpec.setReplicas(Runtime.numContainers(runtimeConfiguration).intValue());
+ // Create the stateful set spec.
+ final V1StatefulSetSpec statefulSetSpec = new V1StatefulSetSpec()
+ .serviceName(topologyName)
+ .replicas(Runtime.numContainers(runtimeConfiguration).intValue());
// Parallel pod management tells the StatefulSet controller to launch or
terminate
// all Pods in parallel, and not to wait for Pods to become Running and
Ready or completely
// terminated prior to launching or terminating another Pod.
statefulSetSpec.setPodManagementPolicy("Parallel");
- // add selector match labels "app=heron" and "topology=topology-name"
- // so the we know which pods to manage
- final V1LabelSelector selector = new V1LabelSelector();
- selector.matchLabels(getPodMatchLabels(topologyName));
- statefulSetSpec.selector(selector);
+ // Add selector match labels "app=heron" and "topology=topology-name"
+ // so we know which pods to manage.
+ final V1LabelSelector selector = new V1LabelSelector()
+ .matchLabels(getPodMatchLabels(topologyName));
+ statefulSetSpec.setSelector(selector);
// create a pod template
- final V1PodTemplateSpec podTemplateSpec = new V1PodTemplateSpec();
+ final V1PodTemplateSpec podTemplateSpec = loadPodFromTemplate();
// set up pod meta
final V1ObjectMeta templateMetaData = new
V1ObjectMeta().labels(getPodLabels(topologyName));
Map<String, String> annotations = new HashMap<>();
annotations.putAll(getPodAnnotations());
annotations.putAll(getPrometheusAnnotations());
- templateMetaData.annotations(annotations);
+ templateMetaData.setAnnotations(annotations);
podTemplateSpec.setMetadata(templateMetaData);
final List<String> command = getExecutorCommand("$" + ENV_SHARD_ID,
numberOfInstances);
- podTemplateSpec.spec(getPodSpec(command, containerResource,
numberOfInstances));
+ configurePodSpec(podTemplateSpec, command, containerResource,
numberOfInstances);
statefulSetSpec.setTemplate(podTemplateSpec);
- statefulSet.spec(statefulSetSpec);
+ statefulSet.setSpec(statefulSetSpec);
return statefulSet;
}
@@ -441,28 +454,67 @@ public class V1Controller extends KubernetesController {
return KubernetesContext.getServiceLabels(getConfiguration());
}
- private V1PodSpec getPodSpec(List<String> executorCommand, Resource resource,
- int numberOfInstances) {
- final V1PodSpec podSpec = new V1PodSpec();
+ private void configurePodSpec(final V1PodTemplateSpec podTemplateSpec,
+ List<String> executorCommand,
+ Resource resource,
+ int numberOfInstances) {
+ if (podTemplateSpec.getSpec() == null) {
+ podTemplateSpec.setSpec(new V1PodSpec());
+ }
+ final V1PodSpec podSpec = podTemplateSpec.getSpec();
// set the termination period to 0 so pods can be deleted quickly
podSpec.setTerminationGracePeriodSeconds(0L);
// set the pod tolerations so pods are rescheduled when nodes go down
//
https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/#taint-based-evictions
- podSpec.setTolerations(getTolerations());
+ configureTolerations(podSpec);
+
+ // Get <executor> container and discard all others.
+ final String executorName = KubernetesConstants.EXECUTOR_NAME;
+ V1Container executorContainer = null;
+ List<V1Container> containers = podSpec.getContainers();
+ if (containers != null) {
+ for (V1Container container : containers) {
+ final String name = container.getName();
+ if (name != null && name.equals(executorName)) {
+ if (executorContainer != null) {
+ throw new TopologySubmissionException(
+ String.format("Multiple configurations found for %s
container", executorName));
+ }
+ executorContainer = container;
+ }
+ }
+ } else {
+ containers = new LinkedList<>();
+ }
- podSpec.containers(Collections.singletonList(
- getContainer(executorCommand, resource, numberOfInstances)));
+ if (executorContainer == null) {
+ executorContainer = new V1Container().name(executorName);
+ containers.add(executorContainer);
+ }
+
+ configureExecutorContainer(executorCommand, resource, numberOfInstances,
executorContainer);
+
+ podSpec.setContainers(containers);
addVolumesIfPresent(podSpec);
mountSecretsAsVolumes(podSpec);
+ }
- return podSpec;
+ @VisibleForTesting
+ protected void configureTolerations(final V1PodSpec spec) {
+ KubernetesUtils.V1ControllerUtils<V1Toleration> utils =
+ new KubernetesUtils.V1ControllerUtils<>();
+ spec.setTolerations(
+ utils.mergeListsDedupe(getTolerations(), spec.getTolerations(),
+ Comparator.comparing(V1Toleration::getKey), "Pod Specification
Tolerations")
+ );
}
- private List<V1Toleration> getTolerations() {
+ @VisibleForTesting
+ protected static List<V1Toleration> getTolerations() {
final List<V1Toleration> tolerations = new ArrayList<>();
KubernetesConstants.TOLERATIONS.forEach(t -> {
final V1Toleration toleration =
@@ -477,13 +529,20 @@ public class V1Controller extends KubernetesController {
return tolerations;
}
- private void addVolumesIfPresent(V1PodSpec spec) {
+ @VisibleForTesting
+ protected void addVolumesIfPresent(final V1PodSpec spec) {
final Config config = getConfiguration();
if (KubernetesContext.hasVolume(config)) {
- final V1Volume volume = Volumes.get().create(config);
- if (volume != null) {
- LOG.fine("Adding volume: " + volume.toString());
- spec.volumes(Collections.singletonList(volume));
+ final V1Volume volumeFromConfig = Volumes.get().create(config);
+ if (volumeFromConfig != null) {
+ // Merge volumes. Deduplicate using volume's name with Heron defaults
taking precedence.
+ KubernetesUtils.V1ControllerUtils<V1Volume> utils =
+ new KubernetesUtils.V1ControllerUtils<>();
+ spec.setVolumes(
+
utils.mergeListsDedupe(Collections.singletonList(volumeFromConfig),
spec.getVolumes(),
+ Comparator.comparing(V1Volume::getName), "Pod Template
Volumes")
+ );
+ LOG.fine("Adding volume: " + volumeFromConfig);
}
}
}
@@ -507,52 +566,62 @@ public class V1Controller extends KubernetesController {
}
}
- private V1Container getContainer(List<String> executorCommand, Resource
resource,
- int numberOfInstances) {
+ private void configureExecutorContainer(List<String> executorCommand,
Resource resource,
+ int numberOfInstances, final
V1Container container) {
final Config configuration = getConfiguration();
- final V1Container container = new V1Container().name("executor");
- // set up the container images
+ // Set up the container images.
container.setImage(KubernetesContext.getExecutorDockerImage(configuration));
- // set up the container command
+ // Set up the container command.
container.setCommand(executorCommand);
if (KubernetesContext.hasImagePullPolicy(configuration)) {
container.setImagePullPolicy(KubernetesContext.getKubernetesImagePullPolicy(configuration));
}
- // setup the environment variables for the container
- final V1EnvVar envVarHost = new V1EnvVar();
- envVarHost.name(KubernetesConstants.ENV_HOST)
- .valueFrom(new V1EnvVarSource()
- .fieldRef(new V1ObjectFieldSelector()
- .fieldPath(KubernetesConstants.POD_IP)));
-
- final V1EnvVar envVarPodName = new V1EnvVar();
- envVarPodName.name(KubernetesConstants.ENV_POD_NAME)
- .valueFrom(new V1EnvVarSource()
- .fieldRef(new V1ObjectFieldSelector()
- .fieldPath(KubernetesConstants.POD_NAME)));
- container.addEnvItem(envVarHost);
- container.addEnvItem(envVarPodName);
+ // Configure environment variables.
+ configureContainerEnvVars(container);
+ // Set secret keys.
setSecretKeyRefs(container);
- // set container resources
- final V1ResourceRequirements resourceRequirements = new
V1ResourceRequirements();
- // Set the Kubernetes container resource limit
- final Map<String, Quantity> limits = new HashMap<>();
+ // Set container resources
+ configureContainerResources(container, configuration, resource);
+
+ // Set container ports.
+ final boolean debuggingEnabled =
+ TopologyUtils.getTopologyRemoteDebuggingEnabled(
+ Runtime.topology(getRuntimeConfiguration()));
+ configureContainerPorts(debuggingEnabled, numberOfInstances, container);
+
+ // setup volume mounts
+ mountVolumeIfPresent(container);
+ }
+
+ @VisibleForTesting
+ protected void configureContainerResources(final V1Container container,
+ final Config configuration, final
Resource resource) {
+ if (container.getResources() == null) {
+ container.setResources(new V1ResourceRequirements());
+ }
+ final V1ResourceRequirements resourceRequirements =
container.getResources();
+
+ // Configure resource limits. Deduplicate on limit name with user values
taking precedence.
+ if (resourceRequirements.getLimits() == null) {
+ resourceRequirements.setLimits(new HashMap<>());
+ }
+ final Map<String, Quantity> limits = resourceRequirements.getLimits();
limits.put(KubernetesConstants.MEMORY,
- Quantity.fromString(KubernetesUtils.Megabytes(
- resource.getRam())));
+ Quantity.fromString(KubernetesUtils.Megabytes(
+ resource.getRam())));
limits.put(KubernetesConstants.CPU,
- Quantity.fromString(Double.toString(roundDecimal(
- resource.getCpu(), 3))));
- resourceRequirements.setLimits(limits);
+ Quantity.fromString(Double.toString(roundDecimal(
+ resource.getCpu(), 3))));
+
+ // Set the Kubernetes container resource request.
KubernetesContext.KubernetesResourceRequestMode requestMode =
- KubernetesContext.getKubernetesRequestMode(configuration);
- // Set the Kubernetes container resource request
+ KubernetesContext.getKubernetesRequestMode(configuration);
if (requestMode ==
KubernetesContext.KubernetesResourceRequestMode.EQUAL_TO_LIMIT) {
LOG.log(Level.CONFIG, "Setting K8s Request equal to Limit");
resourceRequirements.setRequests(limits);
@@ -560,51 +629,95 @@ public class V1Controller extends KubernetesController {
LOG.log(Level.CONFIG, "Not setting K8s request because config was
NOT_SET");
}
container.setResources(resourceRequirements);
+ }
- // set container ports
- final boolean debuggingEnabled =
- TopologyUtils.getTopologyRemoteDebuggingEnabled(
- Runtime.topology(getRuntimeConfiguration()));
- container.setPorts(getContainerPorts(debuggingEnabled, numberOfInstances));
+ @VisibleForTesting
+ protected void configureContainerEnvVars(final V1Container container) {
+ // Deduplicate on var name with Heron defaults take precedence.
+ KubernetesUtils.V1ControllerUtils<V1EnvVar> utils = new
KubernetesUtils.V1ControllerUtils<>();
+ container.setEnv(
+ utils.mergeListsDedupe(getExecutorEnvVars(), container.getEnv(),
+ Comparator.comparing(V1EnvVar::getName), "Pod Template Environment
Variables")
+ );
+ }
- // setup volume mounts
- mountVolumeIfPresent(container);
+ @VisibleForTesting
+ protected static List<V1EnvVar> getExecutorEnvVars() {
+ final V1EnvVar envVarHost = new V1EnvVar();
+ envVarHost.name(KubernetesConstants.ENV_HOST)
+ .valueFrom(new V1EnvVarSource()
+ .fieldRef(new V1ObjectFieldSelector()
+ .fieldPath(KubernetesConstants.POD_IP)));
- return container;
+ final V1EnvVar envVarPodName = new V1EnvVar();
+ envVarPodName.name(KubernetesConstants.ENV_POD_NAME)
+ .valueFrom(new V1EnvVarSource()
+ .fieldRef(new V1ObjectFieldSelector()
+ .fieldPath(KubernetesConstants.POD_NAME)));
+
+ return Arrays.asList(envVarHost, envVarPodName);
}
- private List<V1ContainerPort> getContainerPorts(boolean remoteDebugEnabled,
- int numberOfInstances) {
- List<V1ContainerPort> ports = new ArrayList<>();
- KubernetesConstants.EXECUTOR_PORTS.forEach((p, v) -> {
- final V1ContainerPort port = new V1ContainerPort();
- port.setName(p.getName());
- port.setContainerPort(v);
- ports.add(port);
- });
+ @VisibleForTesting
+ protected void configureContainerPorts(boolean remoteDebugEnabled, int
numberOfInstances,
+ final V1Container container) {
+ List<V1ContainerPort> ports = new ArrayList<>(getExecutorPorts());
if (remoteDebugEnabled) {
- IntStream.range(0, numberOfInstances).forEach(i -> {
- final String portName =
- KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT_NAME + "-" + i;
- final V1ContainerPort port = new V1ContainerPort();
- port.setName(portName);
- port.setContainerPort(KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT +
i);
- ports.add(port);
- });
+ ports.addAll(getDebuggingPorts(numberOfInstances));
}
+ // Set container ports. Deduplicate using port number with Heron defaults
taking precedence.
+ KubernetesUtils.V1ControllerUtils<V1ContainerPort> utils =
+ new KubernetesUtils.V1ControllerUtils<>();
+ container.setPorts(
+ utils.mergeListsDedupe(getExecutorPorts(), container.getPorts(),
+ Comparator.comparing(V1ContainerPort::getContainerPort), "Pod
Template Ports")
+ );
+ }
+
+ @VisibleForTesting
+ protected static List<V1ContainerPort> getExecutorPorts() {
+ List<V1ContainerPort> ports = new LinkedList<>();
+ KubernetesConstants.EXECUTOR_PORTS.forEach((p, v) -> {
+ final V1ContainerPort port = new V1ContainerPort()
+ .name(p.getName())
+ .containerPort(v);
+ ports.add(port);
+ });
+ return ports;
+ }
+
+ @VisibleForTesting
+ protected static List<V1ContainerPort> getDebuggingPorts(int
numberOfInstances) {
+ List<V1ContainerPort> ports = new LinkedList<>();
+ IntStream.range(0, numberOfInstances).forEach(i -> {
+ final String portName =
+ KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT_NAME + "-" + i;
+ final V1ContainerPort port = new V1ContainerPort()
+ .name(portName)
+ .containerPort(KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT + i);
+ ports.add(port);
+ });
return ports;
}
- private void mountVolumeIfPresent(V1Container container) {
+ @VisibleForTesting
+ protected void mountVolumeIfPresent(final V1Container container) {
final Config config = getConfiguration();
if (KubernetesContext.hasContainerVolume(config)) {
final V1VolumeMount mount =
new V1VolumeMount()
.name(KubernetesContext.getContainerVolumeName(config))
.mountPath(KubernetesContext.getContainerVolumeMountPath(config));
- container.volumeMounts(Collections.singletonList(mount));
+
+ // Merge volume mounts. Deduplicate using mount's name with Heron
defaults taking precedence.
+ KubernetesUtils.V1ControllerUtils<V1VolumeMount> utils =
+ new KubernetesUtils.V1ControllerUtils<>();
+ container.setVolumeMounts(
+ utils.mergeListsDedupe(Collections.singletonList(mount),
container.getVolumeMounts(),
+ Comparator.comparing(V1VolumeMount::getName), "Pod Template
Volume Mounts")
+ );
}
}
@@ -635,4 +748,95 @@ public class V1Controller extends KubernetesController {
double scale = Math.pow(10, places);
return Math.round(value * scale) / scale;
}
+
+ @VisibleForTesting
+ protected V1PodTemplateSpec loadPodFromTemplate() {
+ final Pair<String, String> podTemplateConfigMapName =
getPodTemplateLocation();
+
+ // Default Pod Template.
+ if (podTemplateConfigMapName == null) {
+ LOG.log(Level.INFO, "Configuring cluster with the Default Pod Template");
+ return new V1PodTemplateSpec();
+ }
+
+ if (isPodTemplateDisabled) {
+ throw new TopologySubmissionException("Custom Pod Templates are
disabled");
+ }
+
+ final String configMapName = podTemplateConfigMapName.first;
+ final String podTemplateName = podTemplateConfigMapName.second;
+
+ // Attempt to locate ConfigMap with provided Pod Template name.
+ try {
+ V1ConfigMap configMap = getConfigMap(configMapName);
+ if (configMap == null) {
+ throw new ApiException(
+ String.format("K8s client unable to locate ConfigMap '%s'",
configMapName));
+ }
+
+ final Map<String, String> configMapData = configMap.getData();
+ if (configMapData != null && configMapData.containsKey(podTemplateName))
{
+ // NullPointerException when Pod Template is empty.
+ V1PodTemplateSpec podTemplate = ((V1PodTemplate)
+ Yaml.load(configMapData.get(podTemplateName))).getTemplate();
+ LOG.log(Level.INFO, String.format("Configuring cluster with the %s.%s
Pod Template",
+ configMapName, podTemplateName));
+ return podTemplate;
+ }
+
+ // Failure to locate Pod Template with provided name.
+ throw new ApiException(String.format("Failed to locate Pod Template '%s'
in ConfigMap '%s'",
+ podTemplateName, configMapName));
+ } catch (ApiException e) {
+ KubernetesUtils.logExceptionWithDetails(LOG, e.getMessage(), e);
+ throw new TopologySubmissionException(e.getMessage());
+ } catch (IOException | ClassCastException | NullPointerException e) {
+ final String message = String.format("Error parsing Pod Template '%s' in
ConfigMap '%s'",
+ podTemplateName, configMapName);
+ KubernetesUtils.logExceptionWithDetails(LOG, message, e);
+ throw new TopologySubmissionException(message);
+ }
+ }
+
+ @VisibleForTesting
+ protected Pair<String, String> getPodTemplateLocation() {
+ final String podTemplateConfigMapName = KubernetesContext
+ .getPodTemplateConfigMapName(getConfiguration());
+
+ if (podTemplateConfigMapName == null) {
+ return null;
+ }
+
+ try {
+ final int splitPoint = podTemplateConfigMapName.indexOf(".");
+ final String configMapName = podTemplateConfigMapName.substring(0,
splitPoint);
+ final String podTemplateName =
podTemplateConfigMapName.substring(splitPoint + 1);
+
+ if (configMapName.isEmpty() || podTemplateName.isEmpty()) {
+ throw new IllegalArgumentException("Empty ConfigMap or Pod Template
name");
+ }
+
+ return new Pair<>(configMapName, podTemplateName);
+ } catch (IndexOutOfBoundsException | IllegalArgumentException e) {
+ final String message = "Invalid ConfigMap and/or Pod Template name";
+ KubernetesUtils.logExceptionWithDetails(LOG, message, e);
+ throw new TopologySubmissionException(message);
+ }
+ }
+
+ @VisibleForTesting
+ protected V1ConfigMap getConfigMap(String configMapName) {
+ try {
+ return coreClient.readNamespacedConfigMap(
+ configMapName,
+ getNamespace(),
+ null,
+ null,
+ null);
+ } catch (ApiException e) {
+ final String message = "Error retrieving ConfigMaps";
+ KubernetesUtils.logExceptionWithDetails(LOG, message, e);
+ throw new TopologySubmissionException(String.format("%s: %s", message,
e.getMessage()));
+ }
+ }
}
diff --git a/heron/schedulers/tests/java/BUILD
b/heron/schedulers/tests/java/BUILD
index 9bbe49b..70ded82 100644
--- a/heron/schedulers/tests/java/BUILD
+++ b/heron/schedulers/tests/java/BUILD
@@ -192,7 +192,9 @@ java_tests(
java_library(
name = "kubernetes-tests",
srcs = glob(["**/kubernetes/*.java"]),
- deps = scheduler_deps_files + kubernetes_deps_files,
+ deps = scheduler_deps_files + kubernetes_deps_files + [
+ "@maven//:org_apache_commons_commons_collections4",
+ ],
)
java_tests(
@@ -202,6 +204,9 @@ java_tests(
"org.apache.heron.scheduler.kubernetes.KubernetesControllerTest",
"org.apache.heron.scheduler.kubernetes.KubernetesLauncherTest",
"org.apache.heron.scheduler.kubernetes.VolumesTests",
+ "org.apache.heron.scheduler.kubernetes.KubernetesContextTest",
+ "org.apache.heron.scheduler.kubernetes.V1ControllerTest",
+ "org.apache.heron.scheduler.kubernetes.KubernetesUtilsTest",
],
runtime_deps = [":kubernetes-tests"],
)
diff --git
a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesContextTest.java
b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesContextTest.java
new file mode 100644
index 0000000..9a762d6
--- /dev/null
+++
b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesContextTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.scheduler.kubernetes;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.heron.spi.common.Config;
+
+public class KubernetesContextTest {
+
+ public static final String KUBERNETES_POD_TEMPLATE_CONFIGMAP_NAME =
+ "heron.kubernetes.pod.template.configmap.name";
+ private static final String POD_TEMPLATE_CONFIGMAP_NAME =
"pod-template-configmap-name";
+ private final Config config = Config.newBuilder().build();
+ private final Config configWithPodTemplateConfigMap = Config.newBuilder()
+ .put(KubernetesContext.KUBERNETES_POD_TEMPLATE_CONFIGMAP_NAME,
+ POD_TEMPLATE_CONFIGMAP_NAME)
+ .build();
+
+ @Test
+ public void testPodTemplateConfigMapName() {
+
Assert.assertEquals(KubernetesContext.KUBERNETES_POD_TEMPLATE_CONFIGMAP_NAME,
+ KUBERNETES_POD_TEMPLATE_CONFIGMAP_NAME);
+ Assert.assertEquals(
+
KubernetesContext.getPodTemplateConfigMapName(configWithPodTemplateConfigMap),
+ POD_TEMPLATE_CONFIGMAP_NAME);
+ Assert.assertNull(KubernetesContext.getPodTemplateConfigMapName(config));
+ }
+
+ @Test
+ public void testPodTemplateConfigMapDisabled() {
+
Assert.assertFalse(KubernetesContext.getPodTemplateConfigMapDisabled(config));
+ Assert.assertFalse(KubernetesContext
+ .getPodTemplateConfigMapDisabled(configWithPodTemplateConfigMap));
+
+ final Config configWithPodTemplateConfigMapOff = Config.newBuilder()
+ .put(KubernetesContext.KUBERNETES_POD_TEMPLATE_CONFIGMAP_NAME,
+ POD_TEMPLATE_CONFIGMAP_NAME)
+ .put(KubernetesContext.KUBERNETES_POD_TEMPLATE_CONFIGMAP_DISABLED,
"TRUE")
+ .build();
+ Assert.assertTrue(KubernetesContext
+ .getPodTemplateConfigMapDisabled(configWithPodTemplateConfigMapOff));
+ }
+}
diff --git
a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesUtilsTest.java
b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesUtilsTest.java
new file mode 100644
index 0000000..ddc1f69
--- /dev/null
+++
b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesUtilsTest.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.scheduler.kubernetes;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import org.apache.heron.scheduler.TopologySubmissionException;
+
+import io.kubernetes.client.openapi.models.V1EnvVar;
+import io.kubernetes.client.openapi.models.V1EnvVarSource;
+import io.kubernetes.client.openapi.models.V1ObjectFieldSelector;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KubernetesUtilsTest {
+
+ @Test
+ public void testMergeListsDedupe() {
+ final String description = "Pod Template Environment Variables";
+ final List<V1EnvVar> heronEnvVars =
+ Collections.unmodifiableList(V1Controller.getExecutorEnvVars());
+ final V1EnvVar additionEnvVar = new V1EnvVar()
+ .name("env-variable-to-be-kept")
+ .valueFrom(new V1EnvVarSource()
+ .fieldRef(new V1ObjectFieldSelector()
+ .fieldPath("env-variable-was-kept")));
+ final List<V1EnvVar> expectedEnvVars = Collections.unmodifiableList(
+ new LinkedList<V1EnvVar>(V1Controller.getExecutorEnvVars()) {{
+ add(additionEnvVar);
+ }});
+ final List<V1EnvVar> inputEnvVars = Arrays.asList(
+ new V1EnvVar()
+ .name(KubernetesConstants.ENV_HOST)
+ .valueFrom(new V1EnvVarSource()
+ .fieldRef(new V1ObjectFieldSelector()
+ .fieldPath("env-host-to-be-replaced"))),
+ new V1EnvVar()
+ .name(KubernetesConstants.ENV_POD_NAME)
+ .valueFrom(new V1EnvVarSource()
+ .fieldRef(new V1ObjectFieldSelector()
+ .fieldPath("pod-name-to-be-replaced"))),
+ additionEnvVar
+ );
+
+ KubernetesUtils.V1ControllerUtils<V1EnvVar> v1ControllerUtils =
+ new KubernetesUtils.V1ControllerUtils<>();
+
+ // Both input lists are null.
+ Assert.assertNull("Both input lists are <null>",
+ v1ControllerUtils.mergeListsDedupe(null, null,
+ Comparator.comparing(V1EnvVar::getName), description));
+
+ // <primaryList> is <null>.
+ Assert.assertEquals("<primaryList> is null and <secondaryList> should be
returned",
+ inputEnvVars,
+ v1ControllerUtils.mergeListsDedupe(null, inputEnvVars,
+ Comparator.comparing(V1EnvVar::getName), description));
+
+ // <primaryList> is empty.
+ Assert.assertEquals("<primaryList> is empty and <secondaryList> should be
returned",
+ inputEnvVars,
+ v1ControllerUtils.mergeListsDedupe(new LinkedList<>(), inputEnvVars,
+ Comparator.comparing(V1EnvVar::getName), description));
+
+ // <secondaryList> is <null>.
+ Assert.assertEquals("<secondaryList> is null and <primaryList> should be
returned",
+ heronEnvVars,
+ v1ControllerUtils.mergeListsDedupe(heronEnvVars, null,
+ Comparator.comparing(V1EnvVar::getName), description));
+
+ // <secondaryList> is empty.
+ Assert.assertEquals("<secondaryList> is empty and <primaryList> should be
returned",
+ heronEnvVars,
+ v1ControllerUtils.mergeListsDedupe(heronEnvVars, new LinkedList<>(),
+ Comparator.comparing(V1EnvVar::getName), description));
+
+ // Merge both lists.
+ Assert.assertTrue("<primaryList> and <secondaryList> merged and
deduplicated",
+ expectedEnvVars.containsAll(
+ v1ControllerUtils.mergeListsDedupe(heronEnvVars, inputEnvVars,
+ Comparator.comparing(V1EnvVar::getName), description)));
+
+ // Expect thrown error.
+ String errorMessage = "";
+ try {
+ v1ControllerUtils.mergeListsDedupe(heronEnvVars,
Collections.singletonList(new V1EnvVar()),
+ Comparator.comparing(V1EnvVar::getName), description);
+ } catch (TopologySubmissionException e) {
+ errorMessage = e.getMessage();
+ }
+ Assert.assertTrue("Expecting error to be thrown for null deduplication
key",
+ errorMessage.contains(description));
+ }
+}
diff --git
a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/V1ControllerTest.java
b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/V1ControllerTest.java
new file mode 100644
index 0000000..c777383
--- /dev/null
+++
b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/V1ControllerTest.java
@@ -0,0 +1,740 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.scheduler.kubernetes;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Spy;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.heron.common.basics.ByteAmount;
+import org.apache.heron.common.basics.Pair;
+import org.apache.heron.scheduler.TopologySubmissionException;
+import org.apache.heron.spi.common.Config;
+import org.apache.heron.spi.common.Key;
+import org.apache.heron.spi.packing.Resource;
+
+import io.kubernetes.client.custom.Quantity;
+import io.kubernetes.client.openapi.models.V1ConfigMap;
+import io.kubernetes.client.openapi.models.V1ConfigMapBuilder;
+import io.kubernetes.client.openapi.models.V1Container;
+import io.kubernetes.client.openapi.models.V1ContainerBuilder;
+import io.kubernetes.client.openapi.models.V1ContainerPort;
+import io.kubernetes.client.openapi.models.V1EnvVar;
+import io.kubernetes.client.openapi.models.V1EnvVarSource;
+import io.kubernetes.client.openapi.models.V1ObjectFieldSelector;
+import io.kubernetes.client.openapi.models.V1PodSpec;
+import io.kubernetes.client.openapi.models.V1PodSpecBuilder;
+import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
+import io.kubernetes.client.openapi.models.V1ResourceRequirements;
+import io.kubernetes.client.openapi.models.V1Toleration;
+import io.kubernetes.client.openapi.models.V1Volume;
+import io.kubernetes.client.openapi.models.V1VolumeBuilder;
+import io.kubernetes.client.openapi.models.V1VolumeMount;
+import io.kubernetes.client.openapi.models.V1VolumeMountBuilder;
+
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+
+@RunWith(MockitoJUnitRunner.class)
+public class V1ControllerTest {
+
+ private static final String TOPOLOGY_NAME = "topology-name";
+ private static final String CONFIGMAP_NAME = "CONFIG-MAP-NAME";
+ private static final String POD_TEMPLATE_NAME = "POD-TEMPLATE-NAME";
+ private static final String CONFIGMAP_POD_TEMPLATE_NAME =
+ String.format("%s.%s", CONFIGMAP_NAME, POD_TEMPLATE_NAME);
+ private static final String POD_TEMPLATE_VALID =
+ "apiVersion: apps/v1\n"
+ + "kind: PodTemplate\n"
+ + "metadata:\n"
+ + " name: heron-tracker\n"
+ + " namespace: default\n"
+ + "template:\n"
+ + " metadata:\n"
+ + " labels:\n"
+ + " app: heron-tracker\n"
+ + " spec:\n"
+ + " containers:\n"
+ + " - name: heron-tracker\n"
+ + " image: apache/heron:latest\n"
+ + " ports:\n"
+ + " - containerPort: 8888\n"
+ + " name: api-port\n"
+ + " resources:\n"
+ + " requests:\n"
+ + " cpu: \"100m\"\n"
+ + " memory: \"200M\"\n"
+ + " limits:\n"
+ + " cpu: \"400m\"\n"
+ + " memory: \"512M\"";
+
+ private final Config config = Config.newBuilder().build();
+ private final Config configWithPodTemplate = Config.newBuilder()
+ .put(KubernetesContext.KUBERNETES_POD_TEMPLATE_CONFIGMAP_NAME,
CONFIGMAP_POD_TEMPLATE_NAME)
+ .build();
+ private final Config runtime = Config.newBuilder()
+ .put(Key.TOPOLOGY_NAME, TOPOLOGY_NAME)
+ .build();
+ private final Config configDisabledPodTemplate = Config.newBuilder()
+ .put(KubernetesContext.KUBERNETES_POD_TEMPLATE_CONFIGMAP_NAME,
CONFIGMAP_POD_TEMPLATE_NAME)
+ .put(KubernetesContext.KUBERNETES_POD_TEMPLATE_CONFIGMAP_DISABLED,
"true")
+ .build();
+
+ @Spy
+ private final V1Controller v1ControllerWithPodTemplate =
+ new V1Controller(configWithPodTemplate, runtime);
+
+ @Spy
+ private final V1Controller v1ControllerPodTemplate =
+ new V1Controller(configDisabledPodTemplate, runtime);
+
+ @Rule
+ public final ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void testLoadPodFromTemplateDefault() {
+ final V1Controller v1ControllerNoPodTemplate = new V1Controller(config,
runtime);
+ final V1PodTemplateSpec podSpec =
v1ControllerNoPodTemplate.loadPodFromTemplate();
+
+ Assert.assertEquals(podSpec, new V1PodTemplateSpec());
+ }
+
+ @Test
+ public void testLoadPodFromTemplateNullConfigMap() {
+ final String expected = "unable to locate";
+ String message = "";
+
+ doReturn(null)
+ .when(v1ControllerWithPodTemplate)
+ .getConfigMap(anyString());
+ try {
+ v1ControllerWithPodTemplate.loadPodFromTemplate();
+ } catch (TopologySubmissionException e) {
+ message = e.getMessage();
+ }
+ Assert.assertTrue(message.contains(expected));
+ }
+
+ @Test
+ public void testLoadPodFromTemplateNoConfigMap() {
+ final String expected = "Failed to locate Pod Template";
+ String message = "";
+
+ doReturn(new V1ConfigMap())
+ .when(v1ControllerWithPodTemplate)
+ .getConfigMap(anyString());
+ try {
+ v1ControllerWithPodTemplate.loadPodFromTemplate();
+ } catch (TopologySubmissionException e) {
+ message = e.getMessage();
+ }
+ Assert.assertTrue(message.contains(expected));
+ }
+
+ @Test
+ public void testLoadPodFromTemplateNoTargetConfigMap() {
+ final String expected = "Failed to locate Pod Template";
+ String message = "";
+ V1ConfigMap configMapNoTargetData = new V1ConfigMapBuilder()
+ .withNewMetadata()
+ .withName(CONFIGMAP_NAME)
+ .endMetadata()
+ .addToData("Dummy Key", "Dummy Value")
+ .build();
+
+ doReturn(configMapNoTargetData)
+ .when(v1ControllerWithPodTemplate)
+ .getConfigMap(anyString());
+ try {
+ v1ControllerWithPodTemplate.loadPodFromTemplate();
+ } catch (TopologySubmissionException e) {
+ message = e.getMessage();
+ }
+ Assert.assertTrue(message.contains(expected));
+ }
+
+ @Test
+ public void testLoadPodFromTemplateBadTargetConfigMap() {
+ final String expected = "Error parsing";
+ String message = "";
+
+ // ConfigMap with target ConfigMap and an invalid Pod Template.
+ V1ConfigMap configMapInvalidPod = new V1ConfigMapBuilder()
+ .withNewMetadata()
+ .withName(CONFIGMAP_NAME)
+ .endMetadata()
+ .addToData(POD_TEMPLATE_NAME, "Dummy Value")
+ .build();
+
+ doReturn(configMapInvalidPod)
+ .when(v1ControllerWithPodTemplate)
+ .getConfigMap(anyString());
+ try {
+ v1ControllerWithPodTemplate.loadPodFromTemplate();
+ } catch (TopologySubmissionException e) {
+ message = e.getMessage();
+ }
+ Assert.assertTrue("Invalid Pod Template parsing should fail",
message.contains(expected));
+
+ // ConfigMap with target ConfigMaps and an empty Pod Template.
+ V1ConfigMap configMapEmptyPod = new V1ConfigMapBuilder()
+ .withNewMetadata()
+ .withName(CONFIGMAP_NAME)
+ .endMetadata()
+ .addToData(POD_TEMPLATE_NAME, "")
+ .build();
+
+ doReturn(configMapEmptyPod)
+ .when(v1ControllerWithPodTemplate)
+ .getConfigMap(anyString());
+ try {
+ v1ControllerWithPodTemplate.loadPodFromTemplate();
+ } catch (TopologySubmissionException e) {
+ message = e.getMessage();
+ }
+ Assert.assertTrue("Empty Pod Template parsing should fail",
message.contains(expected));
+ }
+
+ @Test
+ public void testLoadPodFromTemplateValidConfigMap() {
+ final String expected =
+ " containers: [class V1Container {\n"
+ + " args: null\n"
+ + " command: null\n"
+ + " env: null\n"
+ + " envFrom: null\n"
+ + " image: apache/heron:latest\n"
+ + " imagePullPolicy: null\n"
+ + " lifecycle: null\n"
+ + " livenessProbe: null\n"
+ + " name: heron-tracker\n"
+ + " ports: [class V1ContainerPort {\n"
+ + " containerPort: 8888\n"
+ + " hostIP: null\n"
+ + " hostPort: null\n"
+ + " name: api-port\n"
+ + " protocol: null\n"
+ + " }]\n"
+ + " readinessProbe: null\n"
+ + " resources: class V1ResourceRequirements {\n"
+ + " limits: {cpu=Quantity{number=0.400,
format=DECIMAL_SI}, "
+ + "memory=Quantity{number=512000000, format=DECIMAL_SI}}\n"
+ + " requests: {cpu=Quantity{number=0.100,
format=DECIMAL_SI}, "
+ + "memory=Quantity{number=200000000, format=DECIMAL_SI}}\n"
+ + " }\n"
+ + " securityContext: null\n"
+ + " startupProbe: null\n"
+ + " stdin: null\n"
+ + " stdinOnce: null\n"
+ + " terminationMessagePath: null\n"
+ + " terminationMessagePolicy: null\n"
+ + " tty: null\n"
+ + " volumeDevices: null\n"
+ + " volumeMounts: null\n"
+ + " workingDir: null\n"
+ + " }]";
+
+
+ // ConfigMap with valid Pod Template.
+ V1ConfigMap configMapValidPod = new V1ConfigMapBuilder()
+ .withNewMetadata()
+ .withName(CONFIGMAP_NAME)
+ .endMetadata()
+ .addToData(POD_TEMPLATE_NAME, POD_TEMPLATE_VALID)
+ .build();
+ doReturn(configMapValidPod)
+ .when(v1ControllerWithPodTemplate)
+ .getConfigMap(anyString());
+ V1PodTemplateSpec podTemplateSpec =
v1ControllerWithPodTemplate.loadPodFromTemplate();
+
+ Assert.assertTrue(podTemplateSpec.toString().contains(expected));
+ }
+
+ @Test
+ public void testLoadPodFromTemplateInvalidConfigMap() {
+ // ConfigMap with an invalid Pod Template.
+ final String invalidPodTemplate =
+ "apiVersion: apps/v1\n"
+ + "kind: InvalidTemplate\n"
+ + "metadata:\n"
+ + " name: heron-tracker\n"
+ + " namespace: default\n"
+ + "template:\n"
+ + " metadata:\n"
+ + " labels:\n"
+ + " app: heron-tracker\n"
+ + " spec:\n";
+ V1ConfigMap configMap = new V1ConfigMapBuilder()
+ .withNewMetadata()
+ .withName(CONFIGMAP_NAME)
+ .endMetadata()
+ .addToData(POD_TEMPLATE_NAME, invalidPodTemplate)
+ .build();
+ final String expected = "Error parsing";
+ String message = "";
+
+ doReturn(configMap)
+ .when(v1ControllerWithPodTemplate)
+ .getConfigMap(anyString());
+ try {
+ v1ControllerWithPodTemplate.loadPodFromTemplate();
+ } catch (TopologySubmissionException e) {
+ message = e.getMessage();
+ }
+ Assert.assertTrue(message.contains(expected));
+ }
+
+ @Test
+ public void testDisablePodTemplates() {
+ // ConfigMap with valid Pod Template.
+ V1ConfigMap configMapValidPod = new V1ConfigMapBuilder()
+ .withNewMetadata()
+ .withName(CONFIGMAP_NAME)
+ .endMetadata()
+ .addToData(POD_TEMPLATE_NAME, POD_TEMPLATE_VALID)
+ .build();
+ final String expected = "Pod Templates are disabled";
+ String message = "";
+ doReturn(configMapValidPod)
+ .when(v1ControllerPodTemplate)
+ .getConfigMap(anyString());
+
+ try {
+ v1ControllerPodTemplate.loadPodFromTemplate();
+ } catch (TopologySubmissionException e) {
+ message = e.getMessage();
+ }
+ Assert.assertTrue(message.contains(expected));
+ }
+
+ @Test
+ public void testGetPodTemplateLocationPassing() {
+ final Config testConfig = Config.newBuilder()
+ .put(KubernetesContext.KUBERNETES_POD_TEMPLATE_CONFIGMAP_NAME,
CONFIGMAP_POD_TEMPLATE_NAME)
+ .build();
+ final V1Controller v1Controller = new V1Controller(testConfig, runtime);
+ final Pair<String, String> expected = new Pair<>(CONFIGMAP_NAME,
POD_TEMPLATE_NAME);
+ Pair<String, String> actual;
+
+ // Correct parsing
+ actual = v1Controller.getPodTemplateLocation();
+ Assert.assertEquals(actual, expected);
+ }
+
+ @Test
+ public void testGetPodTemplateLocationNoConfigMap() {
+ expectedException.expect(TopologySubmissionException.class);
+ final Config testConfig = Config.newBuilder()
+ .put(KubernetesContext.KUBERNETES_POD_TEMPLATE_CONFIGMAP_NAME,
+ ".POD-TEMPLATE-NAME").build();
+ V1Controller v1Controller = new V1Controller(testConfig, runtime);
+ v1Controller.getPodTemplateLocation();
+ }
+
+ @Test
+ public void testGetPodTemplateLocationNoPodTemplate() {
+ expectedException.expect(TopologySubmissionException.class);
+ final Config testConfig = Config.newBuilder()
+ .put(KubernetesContext.KUBERNETES_POD_TEMPLATE_CONFIGMAP_NAME,
+ "CONFIGMAP-NAME.").build();
+ V1Controller v1Controller = new V1Controller(testConfig, runtime);
+ v1Controller.getPodTemplateLocation();
+ }
+
+ @Test
+ public void testGetPodTemplateLocationNoDelimiter() {
+ expectedException.expect(TopologySubmissionException.class);
+ final Config testConfig = Config.newBuilder()
+ .put(KubernetesContext.KUBERNETES_POD_TEMPLATE_CONFIGMAP_NAME,
+ "CONFIGMAP-NAMEPOD-TEMPLATE-NAME").build();
+ V1Controller v1Controller = new V1Controller(testConfig, runtime);
+ v1Controller.getPodTemplateLocation();
+ }
+
+ @Test
+ public void testConfigureContainerPorts() {
+ final String portNamekept = "random-port-to-be-kept";
+ final int portNumberkept = 1111;
+ final int numInstances = 3;
+ final List<V1ContainerPort> expectedPortsBase =
+ Collections.unmodifiableList(V1Controller.getExecutorPorts());
+ final List<V1ContainerPort> debugPorts =
+
Collections.unmodifiableList(V1Controller.getDebuggingPorts(numInstances));
+ final List<V1ContainerPort> inputPortsBase = Collections.unmodifiableList(
+ Arrays.asList(
+ new V1ContainerPort()
+
.name("server-port-to-replace").containerPort(KubernetesConstants.SERVER_PORT),
+ new V1ContainerPort()
+
.name("shell-port-to-replace").containerPort(KubernetesConstants.SHELL_PORT),
+ new
V1ContainerPort().name(portNamekept).containerPort(portNumberkept)
+ )
+ );
+
+ // Null ports. This is the default case.
+ final V1Container inputContainerWithNullPorts = new
V1ContainerBuilder().build();
+ v1ControllerWithPodTemplate.configureContainerPorts(false, 0,
inputContainerWithNullPorts);
+ Assert.assertTrue("Server and/or shell PORTS for container with null ports
list",
+ CollectionUtils.containsAll(inputContainerWithNullPorts.getPorts(),
expectedPortsBase));
+
+ // Empty ports.
+ final V1Container inputContainerWithEmptyPorts = new V1ContainerBuilder()
+ .withPorts(new LinkedList<>())
+ .build();
+ v1ControllerWithPodTemplate.configureContainerPorts(false, 0,
inputContainerWithEmptyPorts);
+ Assert.assertTrue("Server and/or shell PORTS for container with empty
ports list",
+ CollectionUtils.containsAll(inputContainerWithEmptyPorts.getPorts(),
expectedPortsBase));
+
+ // Port overriding.
+ final List<V1ContainerPort> inputPorts = new LinkedList<>(inputPortsBase);
+ final V1Container inputContainerWithPorts = new V1ContainerBuilder()
+ .withPorts(inputPorts)
+ .build();
+ final List<V1ContainerPort> expectedPortsOverriding = new
LinkedList<>(expectedPortsBase);
+ expectedPortsOverriding
+ .add(new
V1ContainerPort().name(portNamekept).containerPort(portNumberkept));
+
+ v1ControllerWithPodTemplate.configureContainerPorts(false, 0,
inputContainerWithPorts);
+ Assert.assertTrue("Server and/or shell PORTS for container should be
overwritten.",
+ CollectionUtils.containsAll(inputContainerWithPorts.getPorts(),
expectedPortsOverriding));
+
+ // Port overriding with debug ports.
+ final List<V1ContainerPort> inputPortsWithDebug = new
LinkedList<>(debugPorts);
+ inputPortsWithDebug.addAll(inputPortsBase);
+ final V1Container inputContainerWithDebug = new V1ContainerBuilder()
+ .withPorts(inputPortsWithDebug)
+ .build();
+ final List<V1ContainerPort> expectedPortsDebug = new
LinkedList<>(expectedPortsBase);
+ expectedPortsDebug.add(new
V1ContainerPort().name(portNamekept).containerPort(portNumberkept));
+ expectedPortsDebug.addAll(debugPorts);
+
+ v1ControllerWithPodTemplate.configureContainerPorts(
+ true, numInstances, inputContainerWithDebug);
+ Assert.assertTrue("Server and/or shell with debug PORTS for container
should be overwritten.",
+ CollectionUtils.containsAll(inputContainerWithDebug.getPorts(),
expectedPortsDebug));
+ }
+
+ @Test
+ public void testConfigureContainerEnvVars() {
+ final List<V1EnvVar> heronEnvVars =
+ Collections.unmodifiableList(V1Controller.getExecutorEnvVars());
+ final V1EnvVar additionEnvVar = new V1EnvVar()
+ .name("env-variable-to-be-kept")
+ .valueFrom(new V1EnvVarSource()
+ .fieldRef(new V1ObjectFieldSelector()
+ .fieldPath("env-variable-was-kept")));
+ final List<V1EnvVar> inputEnvVars = Arrays.asList(
+ new V1EnvVar()
+ .name(KubernetesConstants.ENV_HOST)
+ .valueFrom(new V1EnvVarSource()
+ .fieldRef(new V1ObjectFieldSelector()
+ .fieldPath("env-host-to-be-replaced"))),
+ new V1EnvVar()
+ .name(KubernetesConstants.ENV_POD_NAME)
+ .valueFrom(new V1EnvVarSource()
+ .fieldRef(new V1ObjectFieldSelector()
+ .fieldPath("pod-name-to-be-replaced"))),
+ additionEnvVar
+ );
+
+ // Null env vars. This is the default case.
+ V1Container containerWithNullEnvVars = new V1ContainerBuilder().build();
+
v1ControllerWithPodTemplate.configureContainerEnvVars(containerWithNullEnvVars);
+ Assert.assertTrue("ENV_HOST & ENV_POD_NAME in container with null Env Vars
should match",
+ CollectionUtils.containsAll(containerWithNullEnvVars.getEnv(),
heronEnvVars));
+
+ // Empty env vars.
+ V1Container containerWithEmptyEnvVars = new V1ContainerBuilder()
+ .withEnv(new LinkedList<>())
+ .build();
+
v1ControllerWithPodTemplate.configureContainerEnvVars(containerWithEmptyEnvVars);
+ Assert.assertTrue("ENV_HOST & ENV_POD_NAME in container with empty Env
Vars should match",
+ CollectionUtils.containsAll(containerWithEmptyEnvVars.getEnv(),
heronEnvVars));
+
+ // Env Var overriding.
+ final List<V1EnvVar> expectedOverriding = new LinkedList<>(heronEnvVars);
+ expectedOverriding.add(additionEnvVar);
+ V1Container containerWithEnvVars = new V1ContainerBuilder()
+ .withEnv(inputEnvVars)
+ .build();
+
v1ControllerWithPodTemplate.configureContainerEnvVars(containerWithEnvVars);
+ Assert.assertTrue("ENV_HOST & ENV_POD_NAME in container with Env Vars
should be overridden",
+ CollectionUtils.containsAll(containerWithEnvVars.getEnv(),
expectedOverriding));
+ }
+
+ @Test
+ public void testConfigureContainerResources() {
+ final Resource resourceDefault = new Resource(
+ 9, ByteAmount.fromGigabytes(19), ByteAmount.fromGigabytes(99));
+ final Resource resourceCustom = new Resource(
+ 4, ByteAmount.fromGigabytes(34), ByteAmount.fromGigabytes(400));
+
+ final Quantity defaultRAM = Quantity.fromString(
+ KubernetesUtils.Megabytes(resourceDefault.getRam()));
+ final Quantity defaultCPU = Quantity.fromString(
+ Double.toString(V1Controller.roundDecimal(resourceDefault.getCpu(),
3)));
+ final Quantity customRAM = Quantity.fromString(
+ KubernetesUtils.Megabytes(resourceCustom.getRam()));
+ final Quantity customCPU = Quantity.fromString(
+ Double.toString(V1Controller.roundDecimal(resourceCustom.getCpu(),
3)));
+ final Quantity customDisk = Quantity.fromString(
+
Double.toString(V1Controller.roundDecimal(resourceCustom.getDisk().getValue(),
3)));
+
+ final Config configNoLimit = Config.newBuilder()
+ .put(KubernetesContext.KUBERNETES_RESOURCE_REQUEST_MODE, "NOT_SET")
+ .build();
+ final Config configWithLimit = Config.newBuilder()
+ .put(KubernetesContext.KUBERNETES_RESOURCE_REQUEST_MODE,
"EQUAL_TO_LIMIT")
+ .build();
+
+ final V1ResourceRequirements expectDefaultRequirements = new
V1ResourceRequirements()
+ .putLimitsItem(KubernetesConstants.MEMORY, defaultRAM)
+ .putLimitsItem(KubernetesConstants.CPU, defaultCPU);
+
+ final V1ResourceRequirements expectCustomRequirements = new
V1ResourceRequirements()
+ .putLimitsItem(KubernetesConstants.MEMORY, defaultRAM)
+ .putLimitsItem(KubernetesConstants.CPU, defaultCPU)
+ .putLimitsItem("disk", customDisk);
+
+ final V1ResourceRequirements customRequirements = new
V1ResourceRequirements()
+ .putLimitsItem(KubernetesConstants.MEMORY, customRAM)
+ .putLimitsItem(KubernetesConstants.CPU, customCPU)
+ .putLimitsItem("disk", customDisk);
+
+ // Default. Null resources.
+ V1Container containerNull = new V1ContainerBuilder().build();
+ v1ControllerWithPodTemplate.configureContainerResources(
+ containerNull, configNoLimit, resourceDefault);
+ Assert.assertTrue("Default LIMITS should be set in container with null
LIMITS",
+ containerNull.getResources().getLimits().entrySet()
+ .containsAll(expectDefaultRequirements.getLimits().entrySet()));
+
+ // Empty resources.
+ V1Container containerEmpty = new
V1ContainerBuilder().withNewResources().endResources().build();
+ v1ControllerWithPodTemplate.configureContainerResources(
+ containerEmpty, configNoLimit, resourceDefault);
+ Assert.assertTrue("Default LIMITS should be set in container with empty
LIMITS",
+ containerNull.getResources().getLimits().entrySet()
+ .containsAll(expectDefaultRequirements.getLimits().entrySet()));
+
+ // Custom resources.
+ V1Container containerCustom = new V1ContainerBuilder()
+ .withResources(customRequirements)
+ .build();
+ v1ControllerWithPodTemplate.configureContainerResources(
+ containerCustom, configNoLimit, resourceDefault);
+ Assert.assertTrue("Custom LIMITS should be set in container with custom
LIMITS",
+ containerCustom.getResources().getLimits().entrySet()
+ .containsAll(expectCustomRequirements.getLimits().entrySet()));
+
+ // Custom resources with request.
+ V1Container containerRequests = new V1ContainerBuilder()
+ .withResources(customRequirements)
+ .build();
+ v1ControllerWithPodTemplate.configureContainerResources(
+ containerRequests, configWithLimit, resourceDefault);
+ Assert.assertTrue("Custom LIMITS should be set in container with custom
LIMITS and REQUEST",
+ containerRequests.getResources().getLimits().entrySet()
+ .containsAll(expectCustomRequirements.getLimits().entrySet()));
+ Assert.assertTrue("Custom REQUEST should be set in container with custom
LIMITS and REQUEST",
+ containerRequests.getResources().getRequests().entrySet()
+ .containsAll(expectCustomRequirements.getLimits().entrySet()));
+ }
+
+ @Test
+ public void testAddVolumesIfPresent() {
+ final String pathDefault = "config-host-volume-path";
+ final String pathNameDefault = "config-host-volume-name";
+ final Config configWithVolumes = Config.newBuilder()
+ .put(KubernetesContext.KUBERNETES_VOLUME_NAME, pathNameDefault)
+ .put(KubernetesContext.KUBERNETES_VOLUME_TYPE, Volumes.HOST_PATH)
+ .put(KubernetesContext.KUBERNETES_VOLUME_HOSTPATH_PATH, pathDefault)
+ .build();
+ final V1Controller controllerWithVol = new V1Controller(configWithVolumes,
runtime);
+
+ final V1Volume volumeDefault = new V1VolumeBuilder()
+ .withName(pathNameDefault)
+ .withNewHostPath()
+ .withNewPath(pathDefault)
+ .endHostPath()
+ .build();
+ final V1Volume volumeToBeKept = new V1VolumeBuilder()
+ .withName("volume-to-be-kept-name")
+ .withNewHostPath()
+ .withNewPath("volume-to-be-kept-path")
+ .endHostPath()
+ .build();
+
+ final List<V1Volume> customVolumeList = Arrays.asList(
+ new V1VolumeBuilder()
+ .withName(pathNameDefault)
+ .withNewHostPath()
+ .withNewPath("this-path-must-be-replaced")
+ .endHostPath()
+ .build(),
+ volumeToBeKept
+ );
+ final List<V1Volume> expectedDefault =
Collections.singletonList(volumeDefault);
+ final List<V1Volume> expectedCustom = Arrays.asList(volumeDefault,
volumeToBeKept);
+
+ // No Volumes set.
+ V1Controller controllerDoNotSetVolumes = new
V1Controller(Config.newBuilder().build(), runtime);
+ V1PodSpec podSpecNoSetVolumes = new V1PodSpec();
+ controllerDoNotSetVolumes.addVolumesIfPresent(podSpecNoSetVolumes);
+ Assert.assertNull(podSpecNoSetVolumes.getVolumes());
+
+ // Default. Null Volumes.
+ V1PodSpec podSpecNull = new V1PodSpecBuilder().build();
+ controllerWithVol.addVolumesIfPresent(podSpecNull);
+ Assert.assertTrue("Default VOLUMES should be set in container with null
VOLUMES",
+ CollectionUtils.containsAll(expectedDefault,
podSpecNull.getVolumes()));
+
+ // Empty Volumes list
+ V1PodSpec podSpecEmpty = new V1PodSpecBuilder()
+ .withVolumes(new LinkedList<>())
+ .build();
+ controllerWithVol.addVolumesIfPresent(podSpecEmpty);
+ Assert.assertTrue("Default VOLUMES should be set in container with empty
VOLUMES",
+ CollectionUtils.containsAll(expectedDefault,
podSpecEmpty.getVolumes()));
+
+ // Custom Volumes list
+ V1PodSpec podSpecCustom = new V1PodSpecBuilder()
+ .withVolumes(customVolumeList)
+ .build();
+ controllerWithVol.addVolumesIfPresent(podSpecCustom);
+ Assert.assertTrue("Default VOLUMES should be set in container with custom
VOLUMES",
+ CollectionUtils.containsAll(expectedCustom,
podSpecCustom.getVolumes()));
+ }
+
+ @Test
+ public void testMountVolumeIfPresent() {
+ final String pathDefault = "config-host-volume-path";
+ final String pathNameDefault = "config-host-volume-name";
+ final Config configWithVolumes = Config.newBuilder()
+ .put(KubernetesContext.KUBERNETES_CONTAINER_VOLUME_MOUNT_NAME,
pathNameDefault)
+ .put(KubernetesContext.KUBERNETES_CONTAINER_VOLUME_MOUNT_PATH,
pathDefault)
+ .build();
+ final V1Controller controllerWithMounts = new
V1Controller(configWithVolumes, runtime);
+ final V1VolumeMount volumeDefault = new V1VolumeMountBuilder()
+ .withName(pathNameDefault)
+ .withMountPath(pathDefault)
+ .build();
+ final V1VolumeMount volumeCustom = new V1VolumeMountBuilder()
+ .withName("custom-volume-mount")
+ .withMountPath("should-be-kept")
+ .build();
+
+ final List<V1VolumeMount> expectedMountsDefault =
Collections.singletonList(volumeDefault);
+ final List<V1VolumeMount> expectedMountsCustom =
Arrays.asList(volumeCustom, volumeDefault);
+ final List<V1VolumeMount> volumeMountsCustomList = Arrays.asList(
+ new V1VolumeMountBuilder()
+ .withName(pathNameDefault)
+ .withMountPath("should-be-replaced")
+ .build(),
+ volumeCustom
+ );
+
+ // No Volume Mounts set.
+ V1Controller controllerDoNotSetMounts = new
V1Controller(Config.newBuilder().build(), runtime);
+ V1Container containerNoSetMounts = new V1Container();
+ controllerDoNotSetMounts.mountVolumeIfPresent(containerNoSetMounts);
+ Assert.assertNull(containerNoSetMounts.getVolumeMounts());
+
+ // Default. Null Volume Mounts.
+ V1Container containerNull = new V1ContainerBuilder().build();
+ controllerWithMounts.mountVolumeIfPresent(containerNull);
+ Assert.assertTrue("Default VOLUME MOUNTS should be set in container with
null VOLUME MOUNTS",
+ CollectionUtils.containsAll(expectedMountsDefault,
containerNull.getVolumeMounts()));
+
+ // Empty Volume Mounts.
+ V1Container containerEmpty = new V1ContainerBuilder()
+ .withVolumeMounts(new LinkedList<>())
+ .build();
+ controllerWithMounts.mountVolumeIfPresent(containerEmpty);
+ Assert.assertTrue("Default VOLUME MOUNTS should be set in container with
empty VOLUME MOUNTS",
+ CollectionUtils.containsAll(expectedMountsDefault,
containerEmpty.getVolumeMounts()));
+
+ // Custom Volume Mounts.
+ V1Container containerCustom = new V1ContainerBuilder()
+ .withVolumeMounts(volumeMountsCustomList)
+ .build();
+ controllerWithMounts.mountVolumeIfPresent(containerCustom);
+ Assert.assertTrue("Default VOLUME MOUNTS should be set in container with
custom VOLUME MOUNTS",
+ CollectionUtils.containsAll(expectedMountsCustom,
containerCustom.getVolumeMounts()));
+ }
+
+ @Test
+ public void testConfigureTolerations() {
+ final V1Toleration keptToleration = new V1Toleration()
+ .key("kept toleration")
+ .operator("Some Operator")
+ .effect("Some Effect")
+ .tolerationSeconds(5L);
+ final List<V1Toleration> expectedTolerationBase =
+ Collections.unmodifiableList(V1Controller.getTolerations());
+ final List<V1Toleration> inputTolerationsBase =
Collections.unmodifiableList(
+ Arrays.asList(
+ new V1Toleration()
+
.key(KubernetesConstants.TOLERATIONS.get(0)).operator("replace").effect("replace"),
+ new V1Toleration()
+
.key(KubernetesConstants.TOLERATIONS.get(1)).operator("replace").effect("replace"),
+ keptToleration
+ )
+ );
+
+ // Null Tolerations. This is the default case.
+ final V1PodSpec podSpecNullTolerations = new V1PodSpecBuilder().build();
+ v1ControllerWithPodTemplate.configureTolerations(podSpecNullTolerations);
+ Assert.assertTrue("Pod Spec has null TOLERATIONS and should be set to
Heron's defaults",
+ CollectionUtils.containsAll(podSpecNullTolerations.getTolerations(),
+ expectedTolerationBase));
+
+ // Empty Tolerations.
+ final V1PodSpec podSpecWithEmptyTolerations = new V1PodSpecBuilder()
+ .withTolerations(new LinkedList<>())
+ .build();
+
v1ControllerWithPodTemplate.configureTolerations(podSpecWithEmptyTolerations);
+ Assert.assertTrue("Pod Spec has empty TOLERATIONS and should be set to
Heron's defaults",
+
CollectionUtils.containsAll(podSpecWithEmptyTolerations.getTolerations(),
+ expectedTolerationBase));
+
+ // Toleration overriding.
+ final V1PodSpec podSpecWithTolerations = new V1PodSpecBuilder()
+ .withTolerations(inputTolerationsBase)
+ .build();
+ final List<V1Toleration> expectedTolerationsOverriding =
+ new LinkedList<>(expectedTolerationBase);
+ expectedTolerationsOverriding.add(keptToleration);
+
+ v1ControllerWithPodTemplate.configureTolerations(podSpecWithTolerations);
+ Assert.assertTrue("Pod Spec has TOLERATIONS and should be overridden with
Heron's defaults",
+ CollectionUtils.containsAll(podSpecWithTolerations.getTolerations(),
+ expectedTolerationsOverriding));
+ }
+}