This is an automated email from the ASF dual-hosted git repository.

saadurrahman pushed a commit to branch 
saadurrahman/3846-Refactoring-K8s-Shim-dev
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git

commit 8e67ad10cedf217cdec43ec3c22bad445ae28910
Author: Saad Ur Rahman <[email protected]>
AuthorDate: Wed Jul 20 23:25:18 2022 -0400

    [KubernetesShim] removing unneeded methods that were relocated to Stateful 
Set factory.
---
 .../heron/scheduler/kubernetes/KubernetesShim.java | 720 -----------------
 .../scheduler/kubernetes/KubernetesShimTest.java   | 866 ---------------------
 2 files changed, 1586 deletions(-)

diff --git 
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesShim.java
 
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesShim.java
index c122ff5cda3..de1f4764fb6 100644
--- 
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesShim.java
+++ 
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesShim.java
@@ -22,8 +22,6 @@ package org.apache.heron.scheduler.kubernetes;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -48,7 +46,6 @@ import org.apache.heron.spi.common.Config;
 import org.apache.heron.spi.packing.PackingPlan;
 import org.apache.heron.spi.packing.Resource;
 
-import io.kubernetes.client.custom.Quantity;
 import io.kubernetes.client.custom.V1Patch;
 import io.kubernetes.client.openapi.ApiClient;
 import io.kubernetes.client.openapi.ApiException;
@@ -56,28 +53,18 @@ import io.kubernetes.client.openapi.Configuration;
 import io.kubernetes.client.openapi.apis.AppsV1Api;
 import io.kubernetes.client.openapi.apis.CoreV1Api;
 import io.kubernetes.client.openapi.models.V1ConfigMap;
-import io.kubernetes.client.openapi.models.V1Container;
-import io.kubernetes.client.openapi.models.V1ContainerPort;
 import io.kubernetes.client.openapi.models.V1EnvVar;
 import io.kubernetes.client.openapi.models.V1EnvVarSource;
-import io.kubernetes.client.openapi.models.V1LabelSelector;
 import io.kubernetes.client.openapi.models.V1ObjectFieldSelector;
 import io.kubernetes.client.openapi.models.V1ObjectMeta;
-import io.kubernetes.client.openapi.models.V1PersistentVolumeClaim;
-import io.kubernetes.client.openapi.models.V1PodSpec;
 import io.kubernetes.client.openapi.models.V1PodTemplate;
 import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
-import io.kubernetes.client.openapi.models.V1ResourceRequirements;
-import io.kubernetes.client.openapi.models.V1SecretKeySelector;
-import io.kubernetes.client.openapi.models.V1SecretVolumeSourceBuilder;
 import io.kubernetes.client.openapi.models.V1Service;
 import io.kubernetes.client.openapi.models.V1ServiceSpec;
 import io.kubernetes.client.openapi.models.V1StatefulSet;
 import io.kubernetes.client.openapi.models.V1StatefulSetSpec;
 import io.kubernetes.client.openapi.models.V1Status;
 import io.kubernetes.client.openapi.models.V1Toleration;
-import io.kubernetes.client.openapi.models.V1Volume;
-import io.kubernetes.client.openapi.models.V1VolumeMount;
 import io.kubernetes.client.util.PatchUtils;
 import io.kubernetes.client.util.Yaml;
 import okhttp3.Response;
@@ -383,60 +370,6 @@ public class KubernetesShim extends KubernetesController {
             + "] in namespace [" + getNamespace() + "] is deleted.");
   }
 
-  /**
-   * Generates the command to start Heron within the <code>container</code>.
-   * @param containerId Passed down to <>SchedulerUtils</> to generate 
executor command.
-   * @param numOfInstances Used to configure the debugging ports.
-   * @param isExecutor Flag used to generate the correct <code>shard_id</code>.
-   * @return The complete command to start Heron in a <code>container</code>.
-   */
-  protected List<String> getExecutorCommand(String containerId, int 
numOfInstances,
-                                            boolean isExecutor) {
-    final Config configuration = getConfiguration();
-    final Config runtimeConfiguration = getRuntimeConfiguration();
-    final Map<ExecutorPort, String> ports =
-        KubernetesConstants.EXECUTOR_PORTS.entrySet()
-            .stream()
-            .collect(Collectors.toMap(Map.Entry::getKey,
-                e -> e.getValue().toString()));
-
-    if 
(TopologyUtils.getTopologyRemoteDebuggingEnabled(Runtime.topology(runtimeConfiguration))
-            && numOfInstances != 0) {
-      List<String> remoteDebuggingPorts = new LinkedList<>();
-      IntStream.range(0, numOfInstances).forEach(i -> {
-        int port = KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT + i;
-        remoteDebuggingPorts.add(String.valueOf(port));
-      });
-      ports.put(ExecutorPort.JVM_REMOTE_DEBUGGER_PORTS,
-              String.join(",", remoteDebuggingPorts));
-    }
-
-    final String[] executorCommand =
-        SchedulerUtils.getExecutorCommand(configuration, runtimeConfiguration,
-            containerId, ports);
-    return Arrays.asList(
-        "sh",
-        "-c",
-        KubernetesUtils.getConfCommand(configuration)
-            + " && " + KubernetesUtils.getFetchCommand(configuration, 
runtimeConfiguration)
-            + " && " + setShardIdEnvironmentVariableCommand(isExecutor)
-            + " && " + String.join(" ", executorCommand)
-    );
-  }
-
-  /**
-   * Configures the <code>shard_id</code> for the Heron container based on 
whether it is an <code>executor</code>
-   * or <code>manager</code>. <code>executor</code> IDs are [1 - n) and the 
<code>manager</code> IDs start at 0.
-   * @param isExecutor Switch flag to generate correct command.
-   * @return The command required to put the Heron instance in 
<code>executor</code> or <code>manager</code> mode.
-   */
-  @VisibleForTesting
-  protected static String setShardIdEnvironmentVariableCommand(boolean 
isExecutor) {
-    final String pattern = String.format("%%s=%s && echo shardId=${%%s}",
-        isExecutor ? "$((${POD_NAME##*-} + 1))" : "${POD_NAME##*-}");
-    return String.format(pattern, ENV_SHARD_ID, ENV_SHARD_ID);
-  }
-
   /**
    * Creates a headless <code>Service</code> to facilitate communication 
between Pods in a <code>topology</code>.
    * @return A fully configured <code>Service</code> to be used by a 
<code>topology</code>.
@@ -463,92 +396,6 @@ public class KubernetesShim extends KubernetesController {
     return service;
   }
 
-  /**
-   * Creates and configures the <code>StatefulSet</code> which the topology's 
<code>executor</code>s will run in.
-   * @param containerResource Passed down to configure the 
<code>executor</code> resource limits.
-   * @param numberOfInstances Used to configure the execution command and 
ports for the <code>executor</code>.
-   * @param isExecutor Flag used to configure components specific to 
<code>executor</code> and <code>manager</code>.
-   * @return A fully configured <code>V1StatefulSet</code> for the topology's 
<code>executors</code>.
-   */
-  private V1StatefulSet createStatefulSet(Resource containerResource, int 
numberOfInstances,
-                                          boolean isExecutor) {
-    final String topologyName = getTopologyName();
-    final Config runtimeConfiguration = getRuntimeConfiguration();
-
-    final List<V1Volume> volumes = new LinkedList<>();
-    final List<V1VolumeMount> volumeMounts = new LinkedList<>();
-
-    // Collect Persistent Volume Claim configurations from the CLI.
-    final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> 
configsPVC =
-        KubernetesContext.getVolumeClaimTemplates(getConfiguration(), 
isExecutor);
-
-    // Collect all Volume configurations from the CLI and generate Volumes and 
Volume Mounts.
-    createVolumeAndMountsPersistentVolumeClaimCLI(configsPVC, volumes, 
volumeMounts);
-    createVolumeAndMountsHostPathCLI(
-        KubernetesContext.getVolumeHostPath(getConfiguration(), isExecutor), 
volumes, volumeMounts);
-    createVolumeAndMountsEmptyDirCLI(
-        KubernetesContext.getVolumeEmptyDir(getConfiguration(), isExecutor), 
volumes, volumeMounts);
-    createVolumeAndMountsNFSCLI(
-        KubernetesContext.getVolumeNFS(getConfiguration(), isExecutor), 
volumes, volumeMounts);
-
-    final V1StatefulSet statefulSet = new V1StatefulSet();
-
-    // Setup StatefulSet's metadata.
-    final V1ObjectMeta objectMeta = new V1ObjectMeta()
-        .name(getStatefulSetName(isExecutor))
-        .labels(getPodLabels(topologyName));
-    statefulSet.setMetadata(objectMeta);
-
-    // Create the StatefulSet Spec.
-    // Reduce replica count by one for Executors and set to one for Manager.
-    final int replicasCount =
-        isExecutor ? Runtime.numContainers(runtimeConfiguration).intValue() - 
1 : 1;
-    final V1StatefulSetSpec statefulSetSpec = new V1StatefulSetSpec()
-        .serviceName(topologyName)
-        .replicas(replicasCount);
-
-    // Parallel pod management tells the StatefulSet controller to launch or 
terminate
-    // all Pods in parallel, and not to wait for Pods to become Running and 
Ready or completely
-    // terminated prior to launching or terminating another Pod.
-    statefulSetSpec.setPodManagementPolicy("Parallel");
-
-    // Add selector match labels "app=heron" and "topology=topology-name"
-    // so we know which pods to manage.
-    final V1LabelSelector selector = new V1LabelSelector()
-        .matchLabels(getPodMatchLabels(topologyName));
-    statefulSetSpec.setSelector(selector);
-
-    // Create a Pod Template.
-    final V1PodTemplateSpec podTemplateSpec = loadPodFromTemplate(isExecutor);
-
-    // Set up Pod Metadata.
-    final V1ObjectMeta templateMetaData = new 
V1ObjectMeta().labels(getPodLabels(topologyName));
-    Map<String, String> annotations = new HashMap<>();
-    annotations.putAll(getPodAnnotations());
-    annotations.putAll(getPrometheusAnnotations());
-    templateMetaData.setAnnotations(annotations);
-    podTemplateSpec.setMetadata(templateMetaData);
-
-    configurePodSpec(podTemplateSpec, containerResource, numberOfInstances, 
isExecutor,
-        volumes, volumeMounts);
-
-    statefulSetSpec.setTemplate(podTemplateSpec);
-
-    statefulSet.setSpec(statefulSetSpec);
-
-    
statefulSetSpec.setVolumeClaimTemplates(createPersistentVolumeClaims(configsPVC));
-
-    return statefulSet;
-  }
-
-  /**
-   * Extracts general Pod <code>Annotation</code>s from configurations.
-   * @return Key-value pairs of general <code>Annotation</code>s to be added 
to the Pod.
-   */
-  private Map<String, String> getPodAnnotations() {
-    return KubernetesContext.getPodAnnotations(getConfiguration());
-  }
-
   /**
    * Extracts <code>Service Annotations</code> for configurations.
    * @return Key-value pairs of service <code>Annotation</code>s to be added 
to the Pod.
@@ -557,19 +404,6 @@ public class KubernetesShim extends KubernetesController {
     return KubernetesContext.getServiceAnnotations(getConfiguration());
   }
 
-  /**
-   * Generates <code>Label</code>s to indicate Prometheus scraping and the 
exposed port.
-   * @return Key-value pairs of Prometheus <code>Annotation</code>s to be 
added to the Pod.
-   */
-  private Map<String, String> getPrometheusAnnotations() {
-    final Map<String, String> annotations = new HashMap<>();
-    annotations.put(KubernetesConstants.ANNOTATION_PROMETHEUS_SCRAPE, "true");
-    annotations.put(KubernetesConstants.ANNOTATION_PROMETHEUS_PORT,
-        KubernetesConstants.PROMETHEUS_PORT);
-
-    return annotations;
-  }
-
   /**
    * Generates the <code>heron</code> and <code>topology</code> name 
<code>Match Label</code>s.
    * @param topologyName Name of the <code>topology</code>.
@@ -582,20 +416,6 @@ public class KubernetesShim extends KubernetesController {
     return labels;
   }
 
-  /**
-   * Extracts <code>Label</code>s from configurations, generates the 
<code>heron</code> and
-   * <code>topology</code> name <code>Label</code>s.
-   * @param topologyName Name of the <code>topology</code>.
-   * @return Key-value pairs of <code>Label</code>s to be added to the Pod.
-   */
-  private Map<String, String> getPodLabels(String topologyName) {
-    final Map<String, String> labels = new HashMap<>();
-    labels.put(KubernetesConstants.LABEL_APP, 
KubernetesConstants.LABEL_APP_VALUE);
-    labels.put(KubernetesConstants.LABEL_TOPOLOGY, topologyName);
-    labels.putAll(KubernetesContext.getPodLabels(getConfiguration()));
-    return labels;
-  }
-
   /**
    * Extracts <code>Selector Labels</code> for<code>Service</code>s from 
configurations.
    * @return Key-value pairs of <code>Service Labels</code> to be added to the 
Pod.
@@ -604,401 +424,6 @@ public class KubernetesShim extends KubernetesController {
     return KubernetesContext.getServiceLabels(getConfiguration());
   }
 
-  /**
-   * Configures the <code>Pod Spec</code> section of the 
<code>StatefulSet</code>. The <code>Heron</code> container
-   * will be configured to allow it to function but other supplied containers 
are loaded verbatim.
-   * @param podTemplateSpec The <code>Pod Template Spec</code> section to 
update.
-   * @param resource Passed down to configure the resource limits.
-   * @param numberOfInstances Passed down to configure the ports.
-   * @param isExecutor Flag used to configure components specific to 
<code>Executor</code> and <code>Manager</code>.
-   * @param volumes <code>Volumes</code> generated from configurations options.
-   * @param volumeMounts <code>Volume Mounts</code> generated from 
configurations options.
-   */
-  private void configurePodSpec(final V1PodTemplateSpec podTemplateSpec, 
Resource resource,
-      int numberOfInstances, boolean isExecutor, List<V1Volume> volumes,
-      List<V1VolumeMount> volumeMounts) {
-    if (podTemplateSpec.getSpec() == null) {
-      podTemplateSpec.setSpec(new V1PodSpec());
-    }
-    final V1PodSpec podSpec = podTemplateSpec.getSpec();
-
-    // Set the termination period to 0 so pods can be deleted quickly
-    podSpec.setTerminationGracePeriodSeconds(0L);
-
-    // Set the pod tolerations so pods are rescheduled when nodes go down
-    // 
https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/#taint-based-evictions
-    configureTolerations(podSpec);
-
-    // Get <Heron> container and ignore all others.
-    final String containerName =
-        isExecutor ? KubernetesConstants.EXECUTOR_NAME : 
KubernetesConstants.MANAGER_NAME;
-    V1Container heronContainer = null;
-    List<V1Container> containers = podSpec.getContainers();
-    if (containers != null) {
-      for (V1Container container : containers) {
-        final String name = container.getName();
-        if (name != null && name.equals(containerName)) {
-          if (heronContainer != null) {
-            throw new TopologySubmissionException(
-                String.format("Multiple configurations found for '%s' 
container", containerName));
-          }
-          heronContainer = container;
-        }
-      }
-    } else {
-      containers = new LinkedList<>();
-    }
-
-    if (heronContainer == null) {
-      heronContainer = new V1Container().name(containerName);
-      containers.add(heronContainer);
-    }
-
-    if (!volumes.isEmpty() || !volumeMounts.isEmpty()) {
-      configurePodWithVolumesAndMountsFromCLI(podSpec, heronContainer, volumes,
-          volumeMounts);
-    }
-
-    configureHeronContainer(resource, numberOfInstances, heronContainer, 
isExecutor);
-
-    podSpec.setContainers(containers);
-
-    mountSecretsAsVolumes(podSpec);
-  }
-
-  /**
-   * Adds <code>tolerations</code> to the <code>Pod Spec</code> with Heron's 
values taking precedence.
-   * @param spec <code>Pod Spec</code> to be configured.
-   */
-  @VisibleForTesting
-  protected void configureTolerations(final V1PodSpec spec) {
-    KubernetesUtils.V1ControllerUtils<V1Toleration> utils =
-        new KubernetesUtils.V1ControllerUtils<>();
-    spec.setTolerations(
-        utils.mergeListsDedupe(getTolerations(), spec.getTolerations(),
-            Comparator.comparing(V1Toleration::getKey), "Pod Specification 
Tolerations")
-    );
-  }
-
-  /**
-   * Generates a list of <code>tolerations</code> which Heron requires.
-   * @return A list of configured <code>tolerations</code>.
-   */
-  @VisibleForTesting
-  protected static List<V1Toleration> getTolerations() {
-    final List<V1Toleration> tolerations = new ArrayList<>();
-    KubernetesConstants.TOLERATIONS.forEach(t -> {
-      final V1Toleration toleration =
-          new V1Toleration()
-              .key(t)
-              .operator("Exists")
-              .effect("NoExecute")
-              .tolerationSeconds(10L);
-      tolerations.add(toleration);
-    });
-
-    return tolerations;
-  }
-
-  /**
-   * Adds <code>Volume Mounts</code> for <code>Secrets</code> to a pod.
-   * @param podSpec <code>Pod Spec</code> to add secrets to.
-   */
-  private void mountSecretsAsVolumes(V1PodSpec podSpec) {
-    final Config config = getConfiguration();
-    final Map<String, String> secrets = 
KubernetesContext.getPodSecretsToMount(config);
-    for (Map.Entry<String, String> secret : secrets.entrySet()) {
-      final V1VolumeMount mount = new V1VolumeMount()
-              .name(secret.getKey())
-              .mountPath(secret.getValue());
-      final V1Volume secretVolume = new V1Volume()
-              .name(secret.getKey())
-              .secret(new V1SecretVolumeSourceBuilder()
-                      .withSecretName(secret.getKey())
-                      .build());
-      podSpec.addVolumesItem(secretVolume);
-      for (V1Container container : podSpec.getContainers()) {
-        container.addVolumeMountsItem(mount);
-      }
-    }
-  }
-
-  /**
-   * Configures the <code>Heron</code> container with values for parameters 
Heron requires for functioning.
-   * @param resource Resource limits.
-   * @param numberOfInstances Required number of <code>executor</code> 
containers which is used to configure ports.
-   * @param container The <code>executor</code> container to be configured.
-   * @param isExecutor Flag indicating whether to set a <code>executor</code> 
or <code>manager</code> command.
-   */
-  private void configureHeronContainer(Resource resource, int 
numberOfInstances,
-                                       final V1Container container, boolean 
isExecutor) {
-    final Config configuration = getConfiguration();
-
-    // Set up the container images.
-    
container.setImage(KubernetesContext.getExecutorDockerImage(configuration));
-
-    // Set up the container command.
-    final List<String> command =
-        getExecutorCommand("$" + ENV_SHARD_ID, numberOfInstances, isExecutor);
-    container.setCommand(command);
-
-    if (KubernetesContext.hasImagePullPolicy(configuration)) {
-      
container.setImagePullPolicy(KubernetesContext.getKubernetesImagePullPolicy(configuration));
-    }
-
-    // Configure environment variables.
-    configureContainerEnvVars(container);
-
-    // Set secret keys.
-    setSecretKeyRefs(container);
-
-    // Set container resources
-    configureContainerResources(container, configuration, resource, 
isExecutor);
-
-    // Set container ports.
-    final boolean debuggingEnabled =
-        TopologyUtils.getTopologyRemoteDebuggingEnabled(
-            Runtime.topology(getRuntimeConfiguration()));
-    configureContainerPorts(debuggingEnabled, numberOfInstances, container);
-
-    // setup volume mounts
-    mountVolumeIfPresent(container);
-  }
-
-  /**
-   * Configures the resources in the <code>container</code> with values in the 
<code>config</code> taking precedence.
-   * @param container The <code>container</code> to be configured.
-   * @param configuration The <code>Config</code> object to check if a 
resource request needs to be set.
-   * @param resource User defined resources limits from input.
-   * @param isExecutor Flag to indicate configuration for an 
<code>executor</code> or <code>manager</code>.
-   */
-  @VisibleForTesting
-  protected void configureContainerResources(final V1Container container,
-                                             final Config configuration, final 
Resource resource,
-                                             boolean isExecutor) {
-    if (container.getResources() == null) {
-      container.setResources(new V1ResourceRequirements());
-    }
-    final V1ResourceRequirements resourceRequirements = 
container.getResources();
-
-    // Collect Limits and Requests from CLI.
-    final Map<String, Quantity> limitsCLI = createResourcesRequirement(
-        KubernetesContext.getResourceLimits(configuration, isExecutor));
-    final Map<String, Quantity> requestsCLI = createResourcesRequirement(
-        KubernetesContext.getResourceRequests(configuration, isExecutor));
-
-    if (resourceRequirements.getLimits() == null) {
-      resourceRequirements.setLimits(new HashMap<>());
-    }
-
-    // Set Limits and Resources from CLI <if> available, <else> use Configs. 
Deduplicate on name
-    // with precedence [1] CLI, [2] Config.
-    final Map<String, Quantity> limits = resourceRequirements.getLimits();
-    final Quantity limitCPU = limitsCLI.getOrDefault(KubernetesConstants.CPU,
-        
Quantity.fromString(Double.toString(KubernetesUtils.roundDecimal(resource.getCpu(),
 3))));
-    final Quantity limitMEMORY = 
limitsCLI.getOrDefault(KubernetesConstants.MEMORY,
-        Quantity.fromString(KubernetesUtils.Megabytes(resource.getRam())));
-
-    limits.put(KubernetesConstants.MEMORY, limitMEMORY);
-    limits.put(KubernetesConstants.CPU, limitCPU);
-
-    // Set the Kubernetes container resource request.
-    // Order: [1] CLI, [2] EQUAL_TO_LIMIT, [3] NOT_SET
-    KubernetesContext.KubernetesResourceRequestMode requestMode =
-        KubernetesContext.getKubernetesRequestMode(configuration);
-    if (!requestsCLI.isEmpty()) {
-      if (resourceRequirements.getRequests() == null) {
-        resourceRequirements.setRequests(new HashMap<>());
-      }
-      final Map<String, Quantity> requests = 
resourceRequirements.getRequests();
-
-      if (requestsCLI.containsKey(KubernetesConstants.MEMORY)) {
-        requests.put(KubernetesConstants.MEMORY, 
requestsCLI.get(KubernetesConstants.MEMORY));
-      }
-      if (requestsCLI.containsKey(KubernetesConstants.CPU)) {
-        requests.put(KubernetesConstants.CPU, 
requestsCLI.get(KubernetesConstants.CPU));
-      }
-    } else if (requestMode == 
KubernetesContext.KubernetesResourceRequestMode.EQUAL_TO_LIMIT) {
-      LOG.log(Level.CONFIG, "Setting K8s Request equal to Limit");
-      resourceRequirements.setRequests(limits);
-    } else {
-      LOG.log(Level.CONFIG, "Not setting K8s request because config was 
NOT_SET");
-    }
-    container.setResources(resourceRequirements);
-  }
-
-  /**
-   * Creates <code>Resource Requirements</code> from a Map of 
<code>Config</code> items for <code>CPU</code>
-   * and <code>Memory</code>.
-   * @param configs <code>Configs</code> to be parsed for configuration.
-   * @return Configured <code>Resource Requirements</code>. An 
<code>empty</code> map will be returned
-   * if there are no <code>configs</code>.
-   */
-  @VisibleForTesting
-  protected Map<String, Quantity> createResourcesRequirement(Map<String, 
String> configs) {
-    final Map<String, Quantity> requirements = new HashMap<>();
-
-    if (configs == null || configs.isEmpty()) {
-      return requirements;
-    }
-
-    final String memoryLimit = configs.get(KubernetesConstants.MEMORY);
-    if (memoryLimit != null && !memoryLimit.isEmpty()) {
-      requirements.put(KubernetesConstants.MEMORY, 
Quantity.fromString(memoryLimit));
-    }
-    final String cpuLimit = configs.get(KubernetesConstants.CPU);
-    if (cpuLimit != null && !cpuLimit.isEmpty()) {
-      requirements.put(KubernetesConstants.CPU, Quantity.fromString(cpuLimit));
-    }
-
-    return requirements;
-  }
-
-  /**
-   * Configures the environment variables in the <code>container</code> with 
those Heron requires.
-   * Heron's values take precedence.
-   * @param container The <code>container</code> to be configured.
-   */
-  @VisibleForTesting
-  protected void configureContainerEnvVars(final V1Container container) {
-    // Deduplicate on var name with Heron defaults take precedence.
-    KubernetesUtils.V1ControllerUtils<V1EnvVar> utils = new 
KubernetesUtils.V1ControllerUtils<>();
-    container.setEnv(
-        utils.mergeListsDedupe(getExecutorEnvVars(), container.getEnv(),
-          Comparator.comparing(V1EnvVar::getName), "Pod Template Environment 
Variables")
-    );
-  }
-
-  /**
-   * Generates a list of <code>Environment Variables</code> required by Heron 
to function.
-   * @return A list of configured <code>Environment Variables</code> required 
by Heron to function.
-   */
-  @VisibleForTesting
-  protected static List<V1EnvVar> getExecutorEnvVars() {
-    final V1EnvVar envVarHost = new V1EnvVar();
-    envVarHost.name(KubernetesConstants.ENV_HOST)
-        .valueFrom(new V1EnvVarSource()
-            .fieldRef(new V1ObjectFieldSelector()
-                .fieldPath(KubernetesConstants.POD_IP)));
-
-    final V1EnvVar envVarPodName = new V1EnvVar();
-    envVarPodName.name(KubernetesConstants.ENV_POD_NAME)
-        .valueFrom(new V1EnvVarSource()
-            .fieldRef(new V1ObjectFieldSelector()
-                .fieldPath(KubernetesConstants.POD_NAME)));
-
-    return Arrays.asList(envVarHost, envVarPodName);
-  }
-
-  /**
-   * Configures the ports in the <code>container</code> with those Heron 
requires. Heron's values take precedence.
-   * @param remoteDebugEnabled Flag used to indicate if debugging ports need 
to be added.
-   * @param numberOfInstances The number of debugging ports to be opened.
-   * @param container <code>container</code> to be configured.
-   */
-  @VisibleForTesting
-  protected void configureContainerPorts(boolean remoteDebugEnabled, int 
numberOfInstances,
-                                         final V1Container container) {
-    List<V1ContainerPort> ports = new ArrayList<>(getExecutorPorts());
-
-    if (remoteDebugEnabled) {
-      ports.addAll(getDebuggingPorts(numberOfInstances));
-    }
-
-    // Set container ports. Deduplicate using port number with Heron defaults 
taking precedence.
-    KubernetesUtils.V1ControllerUtils<V1ContainerPort> utils =
-        new KubernetesUtils.V1ControllerUtils<>();
-    container.setPorts(
-        utils.mergeListsDedupe(getExecutorPorts(), container.getPorts(),
-            Comparator.comparing(V1ContainerPort::getContainerPort), "Pod 
Template Ports")
-    );
-  }
-
-  /**
-   * Generates a list of <code>ports</code> required by Heron to function.
-   * @return A list of configured <code>ports</code> required by Heron to 
function.
-   */
-  @VisibleForTesting
-  protected static List<V1ContainerPort> getExecutorPorts() {
-    List<V1ContainerPort> ports = new LinkedList<>();
-    KubernetesConstants.EXECUTOR_PORTS.forEach((p, v) -> {
-      final V1ContainerPort port = new V1ContainerPort()
-          .name(p.getName())
-          .containerPort(v);
-      ports.add(port);
-    });
-    return ports;
-  }
-
-  /**
-   * Generate the debugging ports required by Heron.
-   * @param numberOfInstances The number of debugging ports to generate.
-   * @return A list of configured debugging <code>ports</code>.
-   */
-  @VisibleForTesting
-  protected static List<V1ContainerPort> getDebuggingPorts(int 
numberOfInstances) {
-    List<V1ContainerPort> ports = new LinkedList<>();
-    IntStream.range(0, numberOfInstances).forEach(i -> {
-      final String portName =
-          KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT_NAME + "-" + i;
-      final V1ContainerPort port = new V1ContainerPort()
-          .name(portName)
-          .containerPort(KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT + i);
-      ports.add(port);
-    });
-    return ports;
-  }
-
-  /**
-   * Adds volume mounts to the <code>container</code> that Heron requires. 
Heron's values taking precedence.
-   * @param container <code>container</code> to be configured.
-   */
-  @VisibleForTesting
-  protected void mountVolumeIfPresent(final V1Container container) {
-    final Config config = getConfiguration();
-    if (KubernetesContext.hasContainerVolume(config)) {
-      final V1VolumeMount mount =
-          new V1VolumeMount()
-              .name(KubernetesContext.getContainerVolumeName(config))
-              
.mountPath(KubernetesContext.getContainerVolumeMountPath(config));
-
-      // Merge volume mounts. Deduplicate using mount's name with Heron 
defaults taking precedence.
-      KubernetesUtils.V1ControllerUtils<V1VolumeMount> utils =
-          new KubernetesUtils.V1ControllerUtils<>();
-      container.setVolumeMounts(
-          utils.mergeListsDedupe(Collections.singletonList(mount), 
container.getVolumeMounts(),
-              Comparator.comparing(V1VolumeMount::getName), "Pod Template 
Volume Mounts")
-      );
-    }
-  }
-
-  /**
-   * Adds <code>Secret Key</code> references to a <code>container</code>.
-   * @param container <code>container</code> to be configured.
-   */
-  private void setSecretKeyRefs(V1Container container) {
-    final Config config = getConfiguration();
-    final Map<String, String> podSecretKeyRefs = 
KubernetesContext.getPodSecretKeyRefs(config);
-    for (Map.Entry<String, String> secret : podSecretKeyRefs.entrySet()) {
-      final String[] keyRefParts = secret.getValue().split(":");
-      if (keyRefParts.length != 2) {
-        LOG.log(Level.SEVERE,
-                "SecretKeyRef must be in the form name:key. <" + 
secret.getValue() + ">");
-        throw new TopologyRuntimeManagementException(
-                "SecretKeyRef must be in the form name:key. <" + 
secret.getValue() + ">");
-      }
-      String name = keyRefParts[0];
-      String key = keyRefParts[1];
-      final V1EnvVar envVar = new V1EnvVar()
-              .name(secret.getKey())
-                .valueFrom(new V1EnvVarSource()
-                  .secretKeyRef(new V1SecretKeySelector()
-                          .key(key)
-                          .name(name)));
-      container.addEnvItem(envVar);
-    }
-  }
-
   /**
    * Initiates the process of locating and loading <code>Pod Template</code> 
from a <code>ConfigMap</code>.
    * The loaded text is then parsed into a usable <code>Pod Template</code>.
@@ -1106,151 +531,6 @@ public class KubernetesShim extends KubernetesController 
{
     }
   }
 
-  /**
-   * Generates <code>Persistent Volume Claims Templates</code> from a mapping 
of <code>Volumes</code>
-   * to <code>key-value</code> pairs of configuration options and values.
-   * @param mapOfOpts <code>Volume</code> to configuration 
<code>key-value</code> mappings.
-   * @return Fully populated list of only dynamically backed <code>Persistent 
Volume Claims</code>.
-   */
-  @VisibleForTesting
-  protected List<V1PersistentVolumeClaim> createPersistentVolumeClaims(
-      final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> 
mapOfOpts) {
-
-    List<V1PersistentVolumeClaim> listOfPVCs = new LinkedList<>();
-
-    // Iterate over all the PVC Volumes.
-    for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>> 
pvc
-        : mapOfOpts.entrySet()) {
-
-      // Only create claims for `OnDemand` volumes.
-      final String claimName = 
pvc.getValue().get(KubernetesConstants.VolumeConfigKeys.claimName);
-      if (claimName != null && 
!KubernetesConstants.LABEL_ON_DEMAND.equalsIgnoreCase(claimName)) {
-        continue;
-      }
-
-      listOfPVCs.add(Volumes.get()
-          .createPersistentVolumeClaim(pvc.getKey(),
-              getPersistentVolumeClaimLabels(getTopologyName()), 
pvc.getValue()));
-    }
-    return listOfPVCs;
-  }
-
-  /**
-   * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for 
<code>Persistent Volume Claims</code>s
-   *  to be placed in the <code>Executor</code> and <code>Manager</code> from 
options on the CLI.
-   * @param mapConfig Mapping of <code>Volume</code> option 
<code>key-value</code> configuration pairs.
-   * @param volumes A list of <code>Volume</code> to append to.
-   * @param volumeMounts A list of <code>Volume Mounts</code> to append to.
-   */
-  @VisibleForTesting
-  protected void createVolumeAndMountsPersistentVolumeClaimCLI(
-      final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> 
mapConfig,
-      final List<V1Volume> volumes, final List<V1VolumeMount> volumeMounts) {
-    for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>> 
configs
-        : mapConfig.entrySet()) {
-      final String volumeName = configs.getKey();
-
-      // Do not create Volumes for `OnDemand`.
-      final String claimName = configs.getValue()
-          .get(KubernetesConstants.VolumeConfigKeys.claimName);
-      if (claimName != null && 
!KubernetesConstants.LABEL_ON_DEMAND.equalsIgnoreCase(claimName)) {
-        volumes.add(Volumes.get().createPersistentVolumeClaim(claimName, 
volumeName));
-      }
-      volumeMounts.add(Volumes.get().createMount(volumeName, 
configs.getValue()));
-    }
-  }
-
-  /**
-   * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for 
<code>emptyDir</code>s to be
-   * placed in the <code>Executor</code> and <code>Manager</code> from options 
on the CLI.
-   * @param mapOfOpts Mapping of <code>Volume</code> option 
<code>key-value</code> configuration pairs.
-   * @param volumes A list of <code>Volume</code> to append to.
-   * @param volumeMounts A list of <code>Volume Mounts</code> to append to.
-   */
-  @VisibleForTesting
-  protected void createVolumeAndMountsEmptyDirCLI(
-      final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> 
mapOfOpts,
-      final List<V1Volume> volumes, final List<V1VolumeMount> volumeMounts) {
-    for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>> 
configs
-        : mapOfOpts.entrySet()) {
-      final String volumeName = configs.getKey();
-      final V1Volume volume = Volumes.get()
-          .createVolume(Volumes.VolumeType.EmptyDir, volumeName, 
configs.getValue());
-      volumes.add(volume);
-      volumeMounts.add(Volumes.get().createMount(volumeName, 
configs.getValue()));
-    }
-  }
-
-  /**
-   * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for 
<code>Host Path</code>s to be
-   * placed in the <code>Executor</code> and <code>Manager</code> from options 
on the CLI.
-   * @param mapOfOpts Mapping of <code>Volume</code> option 
<code>key-value</code> configuration pairs.
-   * @param volumes A list of <code>Volume</code> to append to.
-   * @param volumeMounts A list of <code>Volume Mounts</code> to append to.
-   */
-  @VisibleForTesting
-  protected void createVolumeAndMountsHostPathCLI(
-      final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> 
mapOfOpts,
-      final List<V1Volume> volumes, final List<V1VolumeMount> volumeMounts) {
-    for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>> 
configs
-        : mapOfOpts.entrySet()) {
-      final String volumeName = configs.getKey();
-      final V1Volume volume = Volumes.get()
-          .createVolume(Volumes.VolumeType.HostPath, volumeName, 
configs.getValue());
-      volumes.add(volume);
-      volumeMounts.add(Volumes.get().createMount(volumeName, 
configs.getValue()));
-    }
-  }
-
-  /**
-   * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for 
<code>NFS</code>s to be
-   * placed in the <code>Executor</code> and <code>Manager</code> from options 
on the CLI.
-   * @param mapOfOpts Mapping of <code>Volume</code> option 
<code>key-value</code> configuration pairs.
-   * @param volumes A list of <code>Volume</code> to append to.
-   * @param volumeMounts A list of <code>Volume Mounts</code> to append to.
-   */
-  @VisibleForTesting
-  protected void createVolumeAndMountsNFSCLI(
-      final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> 
mapOfOpts,
-      final List<V1Volume> volumes, final List<V1VolumeMount> volumeMounts) {
-    for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>> 
configs
-        : mapOfOpts.entrySet()) {
-      final String volumeName = configs.getKey();
-      final V1Volume volume = Volumes.get()
-          .createVolume(Volumes.VolumeType.NetworkFileSystem, volumeName, 
configs.getValue());
-      volumes.add(volume);
-      volumeMounts.add(Volumes.get().createMount(volumeName, 
configs.getValue()));
-    }
-  }
-
-  /**
-   * Configures the Pod Spec and Heron container with <code>Volumes</code> and 
<code>Volume Mounts</code>.
-   * @param podSpec All generated <code>V1Volume</code> will be placed in the 
<code>Pod Spec</code>.
-   * @param executor All generated <code>V1VolumeMount</code> will be placed 
in the <code>Container</code>.
-   * @param volumes <code>Volumes</code> to be inserted in the Pod Spec.
-   * @param volumeMounts <code>Volumes Mounts</code> to be inserted in the 
Heron container.
-   */
-  @VisibleForTesting
-  protected void configurePodWithVolumesAndMountsFromCLI(final V1PodSpec 
podSpec,
-      final V1Container executor, List<V1Volume> volumes, List<V1VolumeMount> 
volumeMounts) {
-
-    // Deduplicate on Names with Persistent Volume Claims taking precedence.
-
-    KubernetesUtils.V1ControllerUtils<V1Volume> utilsVolumes =
-        new KubernetesUtils.V1ControllerUtils<>();
-    podSpec.setVolumes(
-        utilsVolumes.mergeListsDedupe(volumes, podSpec.getVolumes(),
-            Comparator.comparing(V1Volume::getName),
-            "Pod with Volumes"));
-
-    KubernetesUtils.V1ControllerUtils<V1VolumeMount> utilsMounts =
-        new KubernetesUtils.V1ControllerUtils<>();
-    executor.setVolumeMounts(
-        utilsMounts.mergeListsDedupe(volumeMounts, executor.getVolumeMounts(),
-            Comparator.comparing(V1VolumeMount::getName),
-            "Heron container with Volume Mounts"));
-  }
-
   /**
    * Removes all Persistent Volume Claims associated with a specific topology, 
if they exist.
    * It looks for the following:
diff --git 
a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesShimTest.java
 
b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesShimTest.java
index d713d823466..ec1be4e87a9 100644
--- 
a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesShimTest.java
+++ 
b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesShimTest.java
@@ -19,14 +19,8 @@
 
 package org.apache.heron.scheduler.kubernetes;
 
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
-
-import com.google.common.collect.ImmutableMap;
 
 import org.junit.Assert;
 import org.junit.Rule;
@@ -36,37 +30,16 @@ import org.junit.runner.RunWith;
 import org.mockito.Spy;
 import org.mockito.runners.MockitoJUnitRunner;
 
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.heron.common.basics.ByteAmount;
 import org.apache.heron.common.basics.Pair;
 import org.apache.heron.scheduler.TopologySubmissionException;
 import org.apache.heron.scheduler.kubernetes.KubernetesUtils.TestTuple;
 import org.apache.heron.spi.common.Config;
 import org.apache.heron.spi.common.Key;
-import org.apache.heron.spi.packing.Resource;
 
-import io.kubernetes.client.custom.Quantity;
 import io.kubernetes.client.openapi.models.V1ConfigMap;
 import io.kubernetes.client.openapi.models.V1ConfigMapBuilder;
-import io.kubernetes.client.openapi.models.V1Container;
-import io.kubernetes.client.openapi.models.V1ContainerBuilder;
-import io.kubernetes.client.openapi.models.V1ContainerPort;
-import io.kubernetes.client.openapi.models.V1EnvVar;
-import io.kubernetes.client.openapi.models.V1EnvVarSource;
-import io.kubernetes.client.openapi.models.V1ObjectFieldSelector;
-import io.kubernetes.client.openapi.models.V1PersistentVolumeClaim;
-import io.kubernetes.client.openapi.models.V1PersistentVolumeClaimBuilder;
-import io.kubernetes.client.openapi.models.V1PodSpec;
-import io.kubernetes.client.openapi.models.V1PodSpecBuilder;
 import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
-import io.kubernetes.client.openapi.models.V1ResourceRequirements;
-import io.kubernetes.client.openapi.models.V1Toleration;
-import io.kubernetes.client.openapi.models.V1Volume;
-import io.kubernetes.client.openapi.models.V1VolumeBuilder;
-import io.kubernetes.client.openapi.models.V1VolumeMount;
-import io.kubernetes.client.openapi.models.V1VolumeMountBuilder;
 
-import static 
org.apache.heron.scheduler.kubernetes.KubernetesConstants.VolumeConfigKeys;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doReturn;
 
@@ -443,843 +416,4 @@ public class KubernetesShimTest {
     KubernetesShim kubernetesShim = new KubernetesShim(testConfig, RUNTIME);
     kubernetesShim.getPodTemplateLocation(true);
   }
-
-  @Test
-  public void testConfigureContainerPorts() {
-    final String portNamekept = "random-port-to-be-kept";
-    final int portNumberkept = 1111;
-    final int numInstances = 3;
-    final List<V1ContainerPort> expectedPortsBase =
-        Collections.unmodifiableList(KubernetesShim.getExecutorPorts());
-    final List<V1ContainerPort> debugPorts =
-        
Collections.unmodifiableList(KubernetesShim.getDebuggingPorts(numInstances));
-    final List<V1ContainerPort> inputPortsBase = Collections.unmodifiableList(
-        Arrays.asList(
-            new V1ContainerPort()
-                
.name("server-port-to-replace").containerPort(KubernetesConstants.SERVER_PORT),
-            new V1ContainerPort()
-                
.name("shell-port-to-replace").containerPort(KubernetesConstants.SHELL_PORT),
-            new 
V1ContainerPort().name(portNamekept).containerPort(portNumberkept)
-        )
-    );
-
-    // Null ports. This is the default case.
-    final V1Container inputContainerWithNullPorts = new 
V1ContainerBuilder().build();
-    v1ControllerWithPodTemplate.configureContainerPorts(false, 0, 
inputContainerWithNullPorts);
-    Assert.assertTrue("Server and/or shell PORTS for container with null ports 
list",
-        CollectionUtils.containsAll(inputContainerWithNullPorts.getPorts(), 
expectedPortsBase));
-
-    // Empty ports.
-    final V1Container inputContainerWithEmptyPorts = new V1ContainerBuilder()
-        .withPorts(new LinkedList<>())
-        .build();
-    v1ControllerWithPodTemplate.configureContainerPorts(false, 0, 
inputContainerWithEmptyPorts);
-    Assert.assertTrue("Server and/or shell PORTS for container with empty 
ports list",
-        CollectionUtils.containsAll(inputContainerWithEmptyPorts.getPorts(), 
expectedPortsBase));
-
-    // Port overriding.
-    final List<V1ContainerPort> inputPorts = new LinkedList<>(inputPortsBase);
-    final V1Container inputContainerWithPorts = new V1ContainerBuilder()
-        .withPorts(inputPorts)
-        .build();
-    final List<V1ContainerPort> expectedPortsOverriding = new 
LinkedList<>(expectedPortsBase);
-    expectedPortsOverriding
-        .add(new 
V1ContainerPort().name(portNamekept).containerPort(portNumberkept));
-
-    v1ControllerWithPodTemplate.configureContainerPorts(false, 0, 
inputContainerWithPorts);
-    Assert.assertTrue("Server and/or shell PORTS for container should be 
overwritten.",
-        CollectionUtils.containsAll(inputContainerWithPorts.getPorts(), 
expectedPortsOverriding));
-
-    // Port overriding with debug ports.
-    final List<V1ContainerPort> inputPortsWithDebug = new 
LinkedList<>(debugPorts);
-    inputPortsWithDebug.addAll(inputPortsBase);
-    final V1Container inputContainerWithDebug = new V1ContainerBuilder()
-        .withPorts(inputPortsWithDebug)
-        .build();
-    final List<V1ContainerPort> expectedPortsDebug = new 
LinkedList<>(expectedPortsBase);
-    expectedPortsDebug.add(new 
V1ContainerPort().name(portNamekept).containerPort(portNumberkept));
-    expectedPortsDebug.addAll(debugPorts);
-
-    v1ControllerWithPodTemplate.configureContainerPorts(
-        true, numInstances, inputContainerWithDebug);
-    Assert.assertTrue("Server and/or shell with debug PORTS for container 
should be overwritten.",
-        CollectionUtils.containsAll(inputContainerWithDebug.getPorts(), 
expectedPortsDebug));
-  }
-
-  @Test
-  public void testConfigureContainerEnvVars() {
-    final List<V1EnvVar> heronEnvVars =
-        Collections.unmodifiableList(KubernetesShim.getExecutorEnvVars());
-    final V1EnvVar additionEnvVar = new V1EnvVar()
-        .name("env-variable-to-be-kept")
-        .valueFrom(new V1EnvVarSource()
-            .fieldRef(new V1ObjectFieldSelector()
-                .fieldPath("env-variable-was-kept")));
-    final List<V1EnvVar> inputEnvVars = Arrays.asList(
-        new V1EnvVar()
-            .name(KubernetesConstants.ENV_HOST)
-            .valueFrom(new V1EnvVarSource()
-                .fieldRef(new V1ObjectFieldSelector()
-                    .fieldPath("env-host-to-be-replaced"))),
-        new V1EnvVar()
-            .name(KubernetesConstants.ENV_POD_NAME)
-            .valueFrom(new V1EnvVarSource()
-                .fieldRef(new V1ObjectFieldSelector()
-                    .fieldPath("pod-name-to-be-replaced"))),
-        additionEnvVar
-    );
-
-    // Null env vars. This is the default case.
-    V1Container containerWithNullEnvVars = new V1ContainerBuilder().build();
-    
v1ControllerWithPodTemplate.configureContainerEnvVars(containerWithNullEnvVars);
-    Assert.assertTrue("ENV_HOST & ENV_POD_NAME in container with null Env Vars 
should match",
-        CollectionUtils.containsAll(containerWithNullEnvVars.getEnv(), 
heronEnvVars));
-
-    // Empty env vars.
-    V1Container containerWithEmptyEnvVars = new V1ContainerBuilder()
-        .withEnv(new LinkedList<>())
-        .build();
-    
v1ControllerWithPodTemplate.configureContainerEnvVars(containerWithEmptyEnvVars);
-    Assert.assertTrue("ENV_HOST & ENV_POD_NAME in container with empty Env 
Vars should match",
-        CollectionUtils.containsAll(containerWithEmptyEnvVars.getEnv(), 
heronEnvVars));
-
-    // Env Var overriding.
-    final List<V1EnvVar> expectedOverriding = new LinkedList<>(heronEnvVars);
-    expectedOverriding.add(additionEnvVar);
-    V1Container containerWithEnvVars = new V1ContainerBuilder()
-        .withEnv(inputEnvVars)
-        .build();
-    
v1ControllerWithPodTemplate.configureContainerEnvVars(containerWithEnvVars);
-    Assert.assertTrue("ENV_HOST & ENV_POD_NAME in container with Env Vars 
should be overridden",
-        CollectionUtils.containsAll(containerWithEnvVars.getEnv(), 
expectedOverriding));
-  }
-
-  @Test
-  public void testConfigureContainerResources() {
-    final boolean isExecutor = true;
-
-    final Resource resourceDefault = new Resource(
-        9, ByteAmount.fromMegabytes(19000), ByteAmount.fromMegabytes(99000));
-    final Resource resourceCustom = new Resource(
-        4, ByteAmount.fromMegabytes(34000), ByteAmount.fromMegabytes(400000));
-
-    final Quantity defaultRAM = Quantity.fromString(
-        KubernetesUtils.Megabytes(resourceDefault.getRam()));
-    final Quantity defaultCPU = Quantity.fromString(
-        Double.toString(KubernetesUtils.roundDecimal(resourceDefault.getCpu(), 
3)));
-    final Quantity customRAM = Quantity.fromString(
-        KubernetesUtils.Megabytes(resourceCustom.getRam()));
-    final Quantity customCPU = Quantity.fromString(
-        Double.toString(KubernetesUtils.roundDecimal(resourceCustom.getCpu(), 
3)));
-    final Quantity customDisk = Quantity.fromString(
-        KubernetesUtils.Megabytes(resourceCustom.getDisk()));
-
-    final Config configNoLimit = Config.newBuilder()
-        .put(KubernetesContext.KUBERNETES_RESOURCE_REQUEST_MODE, "NOT_SET")
-        .build();
-    final Config configWithLimit = Config.newBuilder()
-        .put(KubernetesContext.KUBERNETES_RESOURCE_REQUEST_MODE, 
"EQUAL_TO_LIMIT")
-        .build();
-
-    final V1ResourceRequirements expectDefaultRequirements = new 
V1ResourceRequirements()
-        .putLimitsItem(KubernetesConstants.MEMORY, defaultRAM)
-        .putLimitsItem(KubernetesConstants.CPU, defaultCPU);
-
-    final V1ResourceRequirements expectCustomRequirements = new 
V1ResourceRequirements()
-        .putLimitsItem(KubernetesConstants.MEMORY, defaultRAM)
-        .putLimitsItem(KubernetesConstants.CPU, defaultCPU)
-        .putLimitsItem("disk", customDisk);
-
-    final V1ResourceRequirements customRequirements = new 
V1ResourceRequirements()
-        .putLimitsItem(KubernetesConstants.MEMORY, customRAM)
-        .putLimitsItem(KubernetesConstants.CPU, customCPU)
-        .putLimitsItem("disk", customDisk);
-
-    // Default. Null resources.
-    V1Container containerNull = new V1ContainerBuilder().build();
-    v1ControllerWithPodTemplate.configureContainerResources(
-        containerNull, configNoLimit, resourceDefault, isExecutor);
-    Assert.assertTrue("Default LIMITS should be set in container with null 
LIMITS",
-        containerNull.getResources().getLimits().entrySet()
-            .containsAll(expectDefaultRequirements.getLimits().entrySet()));
-
-    // Empty resources.
-    V1Container containerEmpty = new 
V1ContainerBuilder().withNewResources().endResources().build();
-    v1ControllerWithPodTemplate.configureContainerResources(
-        containerEmpty, configNoLimit, resourceDefault, isExecutor);
-    Assert.assertTrue("Default LIMITS should be set in container with empty 
LIMITS",
-        containerNull.getResources().getLimits().entrySet()
-            .containsAll(expectDefaultRequirements.getLimits().entrySet()));
-
-    // Custom resources.
-    V1Container containerCustom = new V1ContainerBuilder()
-        .withResources(customRequirements)
-        .build();
-    v1ControllerWithPodTemplate.configureContainerResources(
-        containerCustom, configNoLimit, resourceDefault, isExecutor);
-    Assert.assertTrue("Custom LIMITS should be set in container with custom 
LIMITS",
-        containerCustom.getResources().getLimits().entrySet()
-            .containsAll(expectCustomRequirements.getLimits().entrySet()));
-
-    // Custom resources with request.
-    V1Container containerRequests = new V1ContainerBuilder()
-        .withResources(customRequirements)
-        .build();
-    v1ControllerWithPodTemplate.configureContainerResources(
-        containerRequests, configWithLimit, resourceDefault, isExecutor);
-    Assert.assertTrue("Custom LIMITS should be set in container with custom 
LIMITS and REQUEST",
-        containerRequests.getResources().getLimits().entrySet()
-            .containsAll(expectCustomRequirements.getLimits().entrySet()));
-    Assert.assertTrue("Custom REQUEST should be set in container with custom 
LIMITS and REQUEST",
-        containerRequests.getResources().getRequests().entrySet()
-            .containsAll(expectCustomRequirements.getLimits().entrySet()));
-  }
-
-  @Test
-  public void testConfigureContainerResourcesCLI() {
-    final boolean isExecutor = true;
-    final String customLimitMEMStr = "120Gi";
-    final String customLimitCPUStr = "5";
-    final String customRequestMEMStr = "100Mi";
-    final String customRequestCPUStr = "4";
-
-    final Resource resources = new Resource(
-        6, ByteAmount.fromMegabytes(34000), ByteAmount.fromGigabytes(400));
-
-    final Quantity customLimitMEM = Quantity.fromString(customLimitMEMStr);
-    final Quantity customLimitCPU = Quantity.fromString(customLimitCPUStr);
-    final Quantity customRequestMEM = Quantity.fromString(customRequestMEMStr);
-    final Quantity customRequestCPU = Quantity.fromString(customRequestCPUStr);
-
-    final Config config = Config.newBuilder()
-        .put(String.format(KubernetesContext.KUBERNETES_RESOURCE_LIMITS_PREFIX
-                + KubernetesConstants.CPU, KubernetesConstants.EXECUTOR_NAME), 
customLimitCPUStr)
-        .put(String.format(KubernetesContext.KUBERNETES_RESOURCE_LIMITS_PREFIX
-                + KubernetesConstants.MEMORY, 
KubernetesConstants.EXECUTOR_NAME), customLimitMEMStr)
-        
.put(String.format(KubernetesContext.KUBERNETES_RESOURCE_REQUESTS_PREFIX
-                + KubernetesConstants.CPU, KubernetesConstants.EXECUTOR_NAME), 
customRequestCPUStr)
-        
.put(String.format(KubernetesContext.KUBERNETES_RESOURCE_REQUESTS_PREFIX
-                + KubernetesConstants.MEMORY, 
KubernetesConstants.EXECUTOR_NAME),
-            customRequestMEMStr)
-        .put(KubernetesContext.KUBERNETES_RESOURCE_REQUEST_MODE, 
"EQUAL_TO_LIMIT")
-        .build();
-
-    final V1Container expected = new V1ContainerBuilder()
-        .withNewResources()
-          .addToLimits(KubernetesConstants.CPU, customLimitCPU)
-          .addToLimits(KubernetesConstants.MEMORY, customLimitMEM)
-          .addToRequests(KubernetesConstants.CPU, customRequestCPU)
-          .addToRequests(KubernetesConstants.MEMORY, customRequestMEM)
-        .endResources()
-        .build();
-
-    final V1Container actual = new V1Container();
-    v1ControllerWithPodTemplate.configureContainerResources(actual, config, 
resources, isExecutor);
-    Assert.assertEquals("Container Resources are set from CLI.", expected, 
actual);
-  }
-
-  @Test
-  public void testMountVolumeIfPresent() {
-    final String pathDefault = "config-host-volume-path";
-    final String pathNameDefault = "config-host-volume-name";
-    final Config configWithVolumes = Config.newBuilder()
-        .put(KubernetesContext.KUBERNETES_CONTAINER_VOLUME_MOUNT_NAME, 
pathNameDefault)
-        .put(KubernetesContext.KUBERNETES_CONTAINER_VOLUME_MOUNT_PATH, 
pathDefault)
-        .build();
-    final KubernetesShim controllerWithMounts = new 
KubernetesShim(configWithVolumes, RUNTIME);
-    final V1VolumeMount volumeDefault = new V1VolumeMountBuilder()
-        .withName(pathNameDefault)
-        .withMountPath(pathDefault)
-        .build();
-    final V1VolumeMount volumeCustom = new V1VolumeMountBuilder()
-        .withName("custom-volume-mount")
-        .withMountPath("should-be-kept")
-        .build();
-
-    final List<V1VolumeMount> expectedMountsDefault = 
Collections.singletonList(volumeDefault);
-    final List<V1VolumeMount> expectedMountsCustom = 
Arrays.asList(volumeCustom, volumeDefault);
-    final List<V1VolumeMount> volumeMountsCustomList = Arrays.asList(
-        new V1VolumeMountBuilder()
-            .withName(pathNameDefault)
-            .withMountPath("should-be-replaced")
-            .build(),
-        volumeCustom
-    );
-
-    // No Volume Mounts set.
-    KubernetesShim controllerDoNotSetMounts =
-        new KubernetesShim(Config.newBuilder().build(), RUNTIME);
-    V1Container containerNoSetMounts = new V1Container();
-    controllerDoNotSetMounts.mountVolumeIfPresent(containerNoSetMounts);
-    Assert.assertNull(containerNoSetMounts.getVolumeMounts());
-
-    // Default. Null Volume Mounts.
-    V1Container containerNull = new V1ContainerBuilder().build();
-    controllerWithMounts.mountVolumeIfPresent(containerNull);
-    Assert.assertTrue("Default VOLUME MOUNTS should be set in container with 
null VOLUME MOUNTS",
-        CollectionUtils.containsAll(expectedMountsDefault, 
containerNull.getVolumeMounts()));
-
-    // Empty Volume Mounts.
-    V1Container containerEmpty = new V1ContainerBuilder()
-        .withVolumeMounts(new LinkedList<>())
-        .build();
-    controllerWithMounts.mountVolumeIfPresent(containerEmpty);
-    Assert.assertTrue("Default VOLUME MOUNTS should be set in container with 
empty VOLUME MOUNTS",
-        CollectionUtils.containsAll(expectedMountsDefault, 
containerEmpty.getVolumeMounts()));
-
-    // Custom Volume Mounts.
-    V1Container containerCustom = new V1ContainerBuilder()
-        .withVolumeMounts(volumeMountsCustomList)
-        .build();
-    controllerWithMounts.mountVolumeIfPresent(containerCustom);
-    Assert.assertTrue("Default VOLUME MOUNTS should be set in container with 
custom VOLUME MOUNTS",
-        CollectionUtils.containsAll(expectedMountsCustom, 
containerCustom.getVolumeMounts()));
-  }
-
-  @Test
-  public void testConfigureTolerations() {
-    final V1Toleration keptToleration = new V1Toleration()
-        .key("kept toleration")
-        .operator("Some Operator")
-        .effect("Some Effect")
-        .tolerationSeconds(5L);
-    final List<V1Toleration> expectedTolerationBase =
-        Collections.unmodifiableList(KubernetesShim.getTolerations());
-    final List<V1Toleration> inputTolerationsBase = 
Collections.unmodifiableList(
-        Arrays.asList(
-            new V1Toleration()
-                
.key(KubernetesConstants.TOLERATIONS.get(0)).operator("replace").effect("replace"),
-            new V1Toleration()
-                
.key(KubernetesConstants.TOLERATIONS.get(1)).operator("replace").effect("replace"),
-            keptToleration
-        )
-    );
-
-    // Null Tolerations. This is the default case.
-    final V1PodSpec podSpecNullTolerations = new V1PodSpecBuilder().build();
-    v1ControllerWithPodTemplate.configureTolerations(podSpecNullTolerations);
-    Assert.assertTrue("Pod Spec has null TOLERATIONS and should be set to 
Heron's defaults",
-        CollectionUtils.containsAll(podSpecNullTolerations.getTolerations(),
-            expectedTolerationBase));
-
-    // Empty Tolerations.
-    final V1PodSpec podSpecWithEmptyTolerations = new V1PodSpecBuilder()
-        .withTolerations(new LinkedList<>())
-        .build();
-    
v1ControllerWithPodTemplate.configureTolerations(podSpecWithEmptyTolerations);
-    Assert.assertTrue("Pod Spec has empty TOLERATIONS and should be set to 
Heron's defaults",
-        
CollectionUtils.containsAll(podSpecWithEmptyTolerations.getTolerations(),
-            expectedTolerationBase));
-
-    // Toleration overriding.
-    final V1PodSpec podSpecWithTolerations = new V1PodSpecBuilder()
-        .withTolerations(inputTolerationsBase)
-        .build();
-    final List<V1Toleration> expectedTolerationsOverriding =
-        new LinkedList<>(expectedTolerationBase);
-    expectedTolerationsOverriding.add(keptToleration);
-
-    v1ControllerWithPodTemplate.configureTolerations(podSpecWithTolerations);
-    Assert.assertTrue("Pod Spec has TOLERATIONS and should be overridden with 
Heron's defaults",
-        CollectionUtils.containsAll(podSpecWithTolerations.getTolerations(),
-            expectedTolerationsOverriding));
-  }
-
-  @Test
-  public void testCreatePersistentVolumeClaims() {
-    final String topologyName = "topology-name";
-    final String volumeNameOne = "volume-name-one";
-    final String volumeNameTwo = "volume-name-two";
-    final String volumeNameStatic = "volume-name-static";
-    final String claimNameOne = "OnDemand";
-    final String claimNameTwo = "claim-name-two";
-    final String claimNameStatic = "OnDEmaND";
-    final String storageClassName = "storage-class-name";
-    final String sizeLimit = "555Gi";
-    final String accessModesList = "ReadWriteOnce,ReadOnlyMany,ReadWriteMany";
-    final String accessModes = "ReadOnlyMany";
-    final String volumeMode = "VolumeMode";
-    final String path = "/path/to/mount/";
-    final String subPath = "/sub/path/to/mount/";
-    final Map<String, Map<VolumeConfigKeys, String>> mapPVCOpts =
-        ImmutableMap.of(
-            volumeNameOne, new HashMap<VolumeConfigKeys, String>() {
-              {
-                put(VolumeConfigKeys.claimName, claimNameOne);
-                put(VolumeConfigKeys.storageClassName, storageClassName);
-                put(VolumeConfigKeys.sizeLimit, sizeLimit);
-                put(VolumeConfigKeys.accessModes, accessModesList);
-                put(VolumeConfigKeys.volumeMode, volumeMode);
-                put(VolumeConfigKeys.path, path);
-              }
-            },
-            volumeNameTwo, new HashMap<VolumeConfigKeys, String>() {
-              {
-                put(VolumeConfigKeys.claimName, claimNameTwo);
-                put(VolumeConfigKeys.storageClassName, storageClassName);
-                put(VolumeConfigKeys.sizeLimit, sizeLimit);
-                put(VolumeConfigKeys.accessModes, accessModes);
-                put(VolumeConfigKeys.volumeMode, volumeMode);
-                put(VolumeConfigKeys.path, path);
-                put(VolumeConfigKeys.subPath, subPath);
-              }
-            },
-            volumeNameStatic, new HashMap<VolumeConfigKeys, String>() {
-              {
-                put(VolumeConfigKeys.claimName, claimNameStatic);
-                put(VolumeConfigKeys.sizeLimit, sizeLimit);
-                put(VolumeConfigKeys.accessModes, accessModes);
-                put(VolumeConfigKeys.volumeMode, volumeMode);
-                put(VolumeConfigKeys.path, path);
-                put(VolumeConfigKeys.subPath, subPath);
-              }
-            }
-        );
-
-    final V1PersistentVolumeClaim claimOne = new 
V1PersistentVolumeClaimBuilder()
-        .withNewMetadata()
-          .withName(volumeNameOne)
-          
.withLabels(KubernetesShim.getPersistentVolumeClaimLabels(topologyName))
-        .endMetadata()
-        .withNewSpec()
-          .withStorageClassName(storageClassName)
-          .withAccessModes(Arrays.asList(accessModesList.split(",")))
-          .withVolumeMode(volumeMode)
-          .withNewResources()
-            .addToRequests("storage", new Quantity(sizeLimit))
-          .endResources()
-        .endSpec()
-        .build();
-
-    final V1PersistentVolumeClaim claimStatic = new 
V1PersistentVolumeClaimBuilder()
-        .withNewMetadata()
-          .withName(volumeNameStatic)
-          
.withLabels(KubernetesShim.getPersistentVolumeClaimLabels(topologyName))
-        .endMetadata()
-        .withNewSpec()
-          .withStorageClassName("")
-          .withAccessModes(Collections.singletonList(accessModes))
-          .withVolumeMode(volumeMode)
-          .withNewResources()
-            .addToRequests("storage", new Quantity(sizeLimit))
-          .endResources()
-        .endSpec()
-        .build();
-
-    final List<V1PersistentVolumeClaim> expectedClaims =
-        new LinkedList<>(Arrays.asList(claimOne, claimStatic));
-
-    final List<V1PersistentVolumeClaim> actualClaims =
-        v1ControllerWithPodTemplate.createPersistentVolumeClaims(mapPVCOpts);
-
-    Assert.assertEquals("Generated claim sizes match", expectedClaims.size(), 
actualClaims.size());
-    Assert.assertTrue(expectedClaims.containsAll(actualClaims));
-  }
-
-  @Test
-  public void testCreatePersistentVolumeClaimVolumesAndMounts() {
-    final String volumeNameOne = "VolumeNameONE";
-    final String volumeNameTwo = "VolumeNameTWO";
-    final String claimNameOne = "claim-name-one";
-    final String claimNameTwo = "OnDemand";
-    final String mountPathOne = "/mount/path/ONE";
-    final String mountPathTwo = "/mount/path/TWO";
-    final String mountSubPathTwo = "/mount/sub/path/TWO";
-    Map<String, Map<VolumeConfigKeys, String>> mapOfOpts =
-        ImmutableMap.of(
-            volumeNameOne, ImmutableMap.of(
-                VolumeConfigKeys.claimName, claimNameOne,
-                VolumeConfigKeys.path, mountPathOne),
-            volumeNameTwo, ImmutableMap.of(
-                VolumeConfigKeys.claimName, claimNameTwo,
-                VolumeConfigKeys.path, mountPathTwo,
-                VolumeConfigKeys.subPath, mountSubPathTwo)
-        );
-    final V1Volume volumeOne = new V1VolumeBuilder()
-        .withName(volumeNameOne)
-        .withNewPersistentVolumeClaim()
-          .withClaimName(claimNameOne)
-        .endPersistentVolumeClaim()
-        .build();
-    final V1Volume volumeTwo = new V1VolumeBuilder()
-        .withName(volumeNameTwo)
-        .withNewPersistentVolumeClaim()
-          .withClaimName(claimNameTwo)
-        .endPersistentVolumeClaim()
-        .build();
-    final V1VolumeMount volumeMountOne = new V1VolumeMountBuilder()
-        .withName(volumeNameOne)
-        .withMountPath(mountPathOne)
-        .build();
-    final V1VolumeMount volumeMountTwo = new V1VolumeMountBuilder()
-        .withName(volumeNameTwo)
-        .withMountPath(mountPathTwo)
-        .withSubPath(mountSubPathTwo)
-        .build();
-
-    // Test case container.
-    // Input: Map of Volume configurations.
-    // Output: The expected lists of Volumes and Volume Mounts.
-    final List<TestTuple<Map<String, Map<VolumeConfigKeys, String>>,
-        Pair<List<V1Volume>, List<V1VolumeMount>>>> testCases = new 
LinkedList<>();
-
-    // Default case: No PVC provided.
-    testCases.add(new TestTuple<>("Generated an empty list of Volumes", new 
HashMap<>(),
-        new Pair<>(new LinkedList<>(), new LinkedList<>())));
-
-    // PVC Provided.
-    final Pair<List<V1Volume>, List<V1VolumeMount>> expectedFull =
-        new Pair<>(
-            new LinkedList<>(Arrays.asList(volumeOne, volumeTwo)),
-            new LinkedList<>(Arrays.asList(volumeMountOne, volumeMountTwo)));
-    testCases.add(new TestTuple<>("Generated a list of Volumes", mapOfOpts,
-        new Pair<>(expectedFull.first, expectedFull.second)));
-
-    // Testing loop.
-    for (TestTuple<Map<String, Map<VolumeConfigKeys, String>>,
-             Pair<List<V1Volume>, List<V1VolumeMount>>> testCase : testCases) {
-      List<V1Volume> actualVolume = new LinkedList<>();
-      List<V1VolumeMount> actualVolumeMount = new LinkedList<>();
-      
v1ControllerPodTemplate.createVolumeAndMountsPersistentVolumeClaimCLI(testCase.input,
-          actualVolume, actualVolumeMount);
-
-      Assert.assertTrue(testCase.description,
-          (testCase.expected.first).containsAll(actualVolume));
-      Assert.assertTrue(testCase.description + " Mounts",
-          (testCase.expected.second).containsAll(actualVolumeMount));
-    }
-  }
-
-  @Test
-  public void testConfigurePodWithVolumesAndMountsFromCLI() {
-    final String volumeNameClashing = "clashing-volume";
-    final String volumeMountNameClashing = "original-volume-mount";
-    V1Volume baseVolume = new V1VolumeBuilder()
-        .withName(volumeNameClashing)
-        .withNewPersistentVolumeClaim()
-        .withClaimName("Original Base Claim Name")
-        .endPersistentVolumeClaim()
-        .build();
-    V1VolumeMount baseVolumeMount = new V1VolumeMountBuilder()
-        .withName(volumeMountNameClashing)
-        .withMountPath("/original/mount/path")
-        .build();
-    V1Volume clashingVolume = new V1VolumeBuilder()
-        .withName(volumeNameClashing)
-        .withNewPersistentVolumeClaim()
-        .withClaimName("Clashing Claim Replaced")
-        .endPersistentVolumeClaim()
-        .build();
-    V1VolumeMount clashingVolumeMount = new V1VolumeMountBuilder()
-        .withName(volumeMountNameClashing)
-        .withMountPath("/clashing/mount/path")
-        .build();
-    V1Volume secondaryVolume = new V1VolumeBuilder()
-        .withName("secondary-volume")
-        .withNewPersistentVolumeClaim()
-        .withClaimName("Original Secondary Claim Name")
-        .endPersistentVolumeClaim()
-        .build();
-    V1VolumeMount secondaryVolumeMount = new V1VolumeMountBuilder()
-        .withName("secondary-volume-mount")
-        .withMountPath("/secondary/mount/path")
-        .build();
-
-    // Test case container.
-    // Input: [0] Pod Spec to modify, [1] Heron container to modify, [2] List 
of Volumes
-    // [3] List of Volume Mounts.
-    // Output: The expected <V1PodSpec> and <V1Container>.
-    final List<TestTuple<Object[], Pair<V1PodSpec, V1Container>>> testCases = 
new LinkedList<>();
-
-    // No Persistent Volume Claim.
-    final V1PodSpec podSpecEmptyCase = new 
V1PodSpecBuilder().withVolumes(baseVolume).build();
-    final V1Container executorEmptyCase =
-        new V1ContainerBuilder().withVolumeMounts(baseVolumeMount).build();
-    final V1PodSpec expectedEmptyPodSpec = new 
V1PodSpecBuilder().withVolumes(baseVolume).build();
-    final V1Container expectedEmptyExecutor =
-        new V1ContainerBuilder().withVolumeMounts(baseVolumeMount).build();
-
-    testCases.add(new TestTuple<>("Empty",
-        new Object[]{podSpecEmptyCase, executorEmptyCase, new LinkedList<>(), 
new LinkedList<>()},
-        new Pair<>(expectedEmptyPodSpec, expectedEmptyExecutor)));
-
-    // Non-clashing Persistent Volume Claim.
-    final V1PodSpec podSpecNoClashCase = new V1PodSpecBuilder()
-        .withVolumes(baseVolume)
-        .build();
-    final V1Container executorNoClashCase = new V1ContainerBuilder()
-        .withVolumeMounts(baseVolumeMount)
-        .build();
-    final V1PodSpec expectedNoClashPodSpec = new V1PodSpecBuilder()
-        .addToVolumes(baseVolume)
-        .addToVolumes(secondaryVolume)
-        .build();
-    final V1Container expectedNoClashExecutor = new V1ContainerBuilder()
-        .addToVolumeMounts(baseVolumeMount)
-        .addToVolumeMounts(secondaryVolumeMount)
-        .build();
-
-    testCases.add(new TestTuple<>("No Clash",
-        new Object[]{podSpecNoClashCase, executorNoClashCase,
-            Collections.singletonList(secondaryVolume),
-            Collections.singletonList(secondaryVolumeMount)},
-        new Pair<>(expectedNoClashPodSpec, expectedNoClashExecutor)));
-
-    // Clashing Persistent Volume Claim.
-    final V1PodSpec podSpecClashCase = new V1PodSpecBuilder()
-        .withVolumes(baseVolume)
-        .build();
-    final V1Container executorClashCase = new V1ContainerBuilder()
-        .withVolumeMounts(baseVolumeMount)
-        .build();
-    final V1PodSpec expectedClashPodSpec = new V1PodSpecBuilder()
-        .addToVolumes(clashingVolume)
-        .addToVolumes(secondaryVolume)
-        .build();
-    final V1Container expectedClashExecutor = new V1ContainerBuilder()
-        .addToVolumeMounts(clashingVolumeMount)
-        .addToVolumeMounts(secondaryVolumeMount)
-        .build();
-
-    testCases.add(new TestTuple<>("Clashing",
-        new Object[]{podSpecClashCase, executorClashCase,
-            Arrays.asList(clashingVolume, secondaryVolume),
-            Arrays.asList(clashingVolumeMount, secondaryVolumeMount)},
-        new Pair<>(expectedClashPodSpec, expectedClashExecutor)));
-
-    // Testing loop.
-    for (TestTuple<Object[], Pair<V1PodSpec, V1Container>> testCase : 
testCases) {
-      v1ControllerWithPodTemplate
-          .configurePodWithVolumesAndMountsFromCLI((V1PodSpec) 
testCase.input[0],
-              (V1Container) testCase.input[1], (List<V1Volume>) 
testCase.input[2],
-              (List<V1VolumeMount>) testCase.input[3]);
-
-      Assert.assertEquals("Pod Specs match " + testCase.description,
-          testCase.input[0], testCase.expected.first);
-      Assert.assertEquals("Executors match " + testCase.description,
-          testCase.input[1], testCase.expected.second);
-    }
-  }
-
-  @Test
-  public void testSetShardIdEnvironmentVariableCommand() {
-
-    List<TestTuple<Boolean, String>> testCases = new LinkedList<>();
-
-    testCases.add(new TestTuple<>("Executor command is set correctly",
-        true, "SHARD_ID=$((${POD_NAME##*-} + 1)) && echo 
shardId=${SHARD_ID}"));
-    testCases.add(new TestTuple<>("Manager command is set correctly",
-        false, "SHARD_ID=${POD_NAME##*-} && echo shardId=${SHARD_ID}"));
-
-    for (TestTuple<Boolean, String> testCase : testCases) {
-      Assert.assertEquals(testCase.description, testCase.expected,
-          
v1ControllerWithPodTemplate.setShardIdEnvironmentVariableCommand(testCase.input));
-    }
-  }
-
-  @Test
-  public void testCreateResourcesRequirement() {
-    final String managerCpuLimit = "3000m";
-    final String managerMemLimit = "256Gi";
-    final Quantity memory = Quantity.fromString(managerMemLimit);
-    final Quantity cpu = Quantity.fromString(managerCpuLimit);
-    final List<TestTuple<Map<String, String>, Map<String, Quantity>>> 
testCases =
-        new LinkedList<>();
-
-    // No input.
-    Map<String, String> inputEmpty = new HashMap<>();
-    testCases.add(new TestTuple<>("Empty input.", inputEmpty, new 
HashMap<>()));
-
-    // Only memory.
-    Map<String, String> inputMemory = new HashMap<String, String>() {
-      {
-        put(KubernetesConstants.MEMORY, managerMemLimit);
-      }
-    };
-    Map<String, Quantity> expectedMemory = new HashMap<String, Quantity>() {
-      {
-        put(KubernetesConstants.MEMORY, memory);
-      }
-    };
-    testCases.add(new TestTuple<>("Only memory input.", inputMemory, 
expectedMemory));
-
-    // Only CPU.
-    Map<String, String> inputCPU = new HashMap<String, String>() {
-      {
-        put(KubernetesConstants.CPU, managerCpuLimit);
-      }
-    };
-    Map<String, Quantity> expectedCPU = new HashMap<String, Quantity>() {
-      {
-        put(KubernetesConstants.CPU, cpu);
-      }
-    };
-    testCases.add(new TestTuple<>("Only CPU input.", inputCPU, expectedCPU));
-
-    // CPU and memory.
-    Map<String, String> inputMemoryCPU = new HashMap<String, String>() {
-      {
-        put(KubernetesConstants.MEMORY, managerMemLimit);
-        put(KubernetesConstants.CPU, managerCpuLimit);
-      }
-    };
-    Map<String, Quantity> expectedMemoryCPU = new HashMap<String, Quantity>() {
-      {
-        put(KubernetesConstants.MEMORY, memory);
-        put(KubernetesConstants.CPU, cpu);
-      }
-    };
-    testCases.add(new TestTuple<>("Memory and CPU input.", inputMemoryCPU, 
expectedMemoryCPU));
-
-    // Invalid.
-    Map<String, String> inputInvalid = new HashMap<String, String>() {
-      {
-        put("invalid input", "will not be ignored");
-        put(KubernetesConstants.CPU, managerCpuLimit);
-      }
-    };
-    Map<String, Quantity> expectedInvalid = new HashMap<String, Quantity>() {
-      {
-        put(KubernetesConstants.CPU, cpu);
-      }
-    };
-    testCases.add(new TestTuple<>("Invalid input.", inputInvalid, 
expectedInvalid));
-
-    // Test loop.
-    for (TestTuple<Map<String, String>, Map<String, Quantity>> testCase : 
testCases) {
-      Map<String, Quantity> actual =
-          v1ControllerPodTemplate.createResourcesRequirement(testCase.input);
-      Assert.assertEquals(testCase.description, testCase.expected, actual);
-    }
-  }
-
-  @Test
-  public void testCreateVolumeAndMountsEmptyDirCLI() {
-    final String volumeName = "volume-name-empty-dir";
-    final String medium = "Memory";
-    final String sizeLimit = "1Gi";
-    final String path = "/path/to/mount";
-    final String subPath = "/sub/path/to/mount";
-
-    // Empty Dir.
-    final Map<String, Map<VolumeConfigKeys, String>> config =
-        ImmutableMap.of(volumeName, new HashMap<VolumeConfigKeys, String>() {
-          {
-            put(VolumeConfigKeys.sizeLimit, sizeLimit);
-            put(VolumeConfigKeys.medium, "Memory");
-            put(VolumeConfigKeys.path, path);
-            put(VolumeConfigKeys.subPath, subPath);
-          }
-        });
-    final List<V1Volume> expectedVolumes = Collections.singletonList(
-        new V1VolumeBuilder()
-            .withName(volumeName)
-            .withNewEmptyDir()
-              .withMedium(medium)
-              .withNewSizeLimit(sizeLimit)
-            .endEmptyDir()
-            .build()
-    );
-    final List<V1VolumeMount> expectedMounts = Collections.singletonList(
-        new V1VolumeMountBuilder()
-            .withName(volumeName)
-              .withMountPath(path)
-              .withSubPath(subPath)
-            .build()
-    );
-
-    List<V1Volume> actualVolumes = new LinkedList<>();
-    List<V1VolumeMount> actualMounts = new LinkedList<>();
-    v1ControllerPodTemplate.createVolumeAndMountsEmptyDirCLI(config, 
actualVolumes, actualMounts);
-    Assert.assertEquals("Empty Dir Volume populated", expectedVolumes, 
actualVolumes);
-    Assert.assertEquals("Empty Dir Volume Mount populated", expectedMounts, 
actualMounts);
-  }
-
-  @Test
-  public void testCreateVolumeAndMountsHostPathCLI() {
-    final String volumeName = "volume-name-host-path";
-    final String type = "DirectoryOrCreate";
-    final String pathOnHost = "path.on.host";
-    final String path = "/path/to/mount";
-    final String subPath = "/sub/path/to/mount";
-
-    // Host Path.
-    final Map<String, Map<VolumeConfigKeys, String>> config =
-        ImmutableMap.of(volumeName, new HashMap<VolumeConfigKeys, String>() {
-          {
-            put(VolumeConfigKeys.type, type);
-            put(VolumeConfigKeys.pathOnHost, pathOnHost);
-            put(VolumeConfigKeys.path, path);
-            put(VolumeConfigKeys.subPath, subPath);
-          }
-        });
-    final List<V1Volume> expectedVolumes = Collections.singletonList(
-        new V1VolumeBuilder()
-            .withName(volumeName)
-            .withNewHostPath()
-              .withNewType(type)
-              .withNewPath(pathOnHost)
-            .endHostPath()
-            .build()
-    );
-    final List<V1VolumeMount> expectedMounts = Collections.singletonList(
-        new V1VolumeMountBuilder()
-            .withName(volumeName)
-              .withMountPath(path)
-              .withSubPath(subPath)
-            .build()
-    );
-
-    List<V1Volume> actualVolumes = new LinkedList<>();
-    List<V1VolumeMount> actualMounts = new LinkedList<>();
-    v1ControllerPodTemplate.createVolumeAndMountsHostPathCLI(config, 
actualVolumes, actualMounts);
-    Assert.assertEquals("Host Path Volume populated", expectedVolumes, 
actualVolumes);
-    Assert.assertEquals("Host Path Volume Mount populated", expectedMounts, 
actualMounts);
-  }
-
-  @Test
-  public void testCreateVolumeAndMountsNFSCLI() {
-    final String volumeName = "volume-name-nfs";
-    final String server = "nfs.server.address";
-    final String pathOnNFS = "path.on.host";
-    final String readOnly = "true";
-    final String path = "/path/to/mount";
-    final String subPath = "/sub/path/to/mount";
-
-    // NFS.
-    final Map<String, Map<VolumeConfigKeys, String>> config =
-        ImmutableMap.of(volumeName, new HashMap<VolumeConfigKeys, String>() {
-          {
-            put(VolumeConfigKeys.server, server);
-            put(VolumeConfigKeys.readOnly, readOnly);
-            put(VolumeConfigKeys.pathOnNFS, pathOnNFS);
-            put(VolumeConfigKeys.path, path);
-            put(VolumeConfigKeys.subPath, subPath);
-          }
-        });
-    final List<V1Volume> expectedVolumes = Collections.singletonList(
-        new V1VolumeBuilder()
-            .withName(volumeName)
-            .withNewNfs()
-              .withServer(server)
-              .withPath(pathOnNFS)
-              .withReadOnly(Boolean.parseBoolean(readOnly))
-            .endNfs()
-            .build()
-    );
-    final List<V1VolumeMount> expectedMounts = Collections.singletonList(
-        new V1VolumeMountBuilder()
-            .withName(volumeName)
-            .withMountPath(path)
-            .withSubPath(subPath)
-            .withReadOnly(true)
-            .build()
-    );
-
-    List<V1Volume> actualVolumes = new LinkedList<>();
-    List<V1VolumeMount> actualMounts = new LinkedList<>();
-    v1ControllerPodTemplate.createVolumeAndMountsNFSCLI(config, actualVolumes, 
actualMounts);
-    Assert.assertEquals("NFS Volume populated", expectedVolumes, 
actualVolumes);
-    Assert.assertEquals("NFS Volume Mount populated", expectedMounts, 
actualMounts);
-  }
 }

Reply via email to