This is an automated email from the ASF dual-hosted git repository.
nicknezis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 5a1b981 [Heron-3723] Add support for Persistent Volumes for stateful
storage (#3725)
5a1b981 is described below
commit 5a1b9814133d3767aeee7e796650cbab031365d5
Author: Saad Ur Rahman <[email protected]>
AuthorDate: Tue Nov 30 00:20:34 2021 -0500
[Heron-3723] Add support for Persistent Volumes for stateful storage (#3725)
Co-authored-by: Nicholas Nezis <[email protected]>
Co-authored-by: Josh Fischer <[email protected]>
Co-authored-by: zhangshaoning
<[email protected]>
Co-authored-by: Huijun Wu <[email protected]>
Co-authored-by: Huijun Wu <[email protected]>
---
deploy/kubernetes/general/apiserver.yaml | 1 +
deploy/kubernetes/helm/templates/tools.yaml | 11 +
deploy/kubernetes/helm/values.yaml.template | 3 +
deploy/kubernetes/minikube/apiserver.yaml | 1 +
.../scheduler/kubernetes/KubernetesConstants.java | 15 +-
.../scheduler/kubernetes/KubernetesContext.java | 85 ++++++
.../scheduler/kubernetes/KubernetesUtils.java | 23 ++
.../heron/scheduler/kubernetes/V1Controller.java | 239 ++++++++++++++++-
.../kubernetes/KubernetesContextTest.java | 137 +++++++++-
.../scheduler/kubernetes/V1ControllerTest.java | 289 +++++++++++++++++++++
.../schedulers-k8s-persistent-volume-claims.md | 257 ++++++++++++++++++
website2/website/sidebars.json | 1 +
12 files changed, 1058 insertions(+), 4 deletions(-)
diff --git a/deploy/kubernetes/general/apiserver.yaml
b/deploy/kubernetes/general/apiserver.yaml
index 33c9533..094715a 100644
--- a/deploy/kubernetes/general/apiserver.yaml
+++ b/deploy/kubernetes/general/apiserver.yaml
@@ -92,6 +92,7 @@ spec:
-D
heron.statefulstorage.classname=org.apache.heron.statefulstorage.dlog.DlogStorage
-D
heron.statefulstorage.dlog.namespace.uri=distributedlog://zookeeper:2181/heron
-D heron.kubernetes.pod.template.configmap.disabled=false
+ -D heron.kubernetes.persistent.volume.claims.cli.disabled=false
---
apiVersion: v1
diff --git a/deploy/kubernetes/helm/templates/tools.yaml
b/deploy/kubernetes/helm/templates/tools.yaml
index 08b0707..c776e49 100644
--- a/deploy/kubernetes/helm/templates/tools.yaml
+++ b/deploy/kubernetes/helm/templates/tools.yaml
@@ -159,6 +159,7 @@ spec:
{{- end }}
-D heron.kubernetes.resource.request.mode={{
.Values.topologyResourceRequestMode }}
-D heron.kubernetes.pod.template.configmap.disabled={{
.Values.disablePodTemplates }}
+ -D heron.kubernetes.persistent.volume.claims.cli.disabled={{
.Values.disablePersistentVolumeMountsCLI }}
envFrom:
- configMapRef:
name: {{ .Release.Name }}-tools-config
@@ -321,3 +322,13 @@ rules:
verbs:
- get
- list
+- apiGroups:
+ - ""
+ resources:
+ - persistentvolumeclaims
+ verbs:
+ - create
+ - delete
+ - get
+ - list
+ - deletecollection
diff --git a/deploy/kubernetes/helm/values.yaml.template
b/deploy/kubernetes/helm/values.yaml.template
index dfbdd1f..870db57 100644
--- a/deploy/kubernetes/helm/values.yaml.template
+++ b/deploy/kubernetes/helm/values.yaml.template
@@ -77,6 +77,9 @@ packing: RoundRobin # ResourceCompliantRR, FirstFitDecreasing
# Support for ConfigMap mounted PodTemplates
disablePodTemplates: false
+# Support for Dynamic Persistent Volume Mounts from CLI input
+disablePersistentVolumeMountsCLI: false
+
# Number of replicas for storage bookies, memory and storage requirements
bookieReplicas: 3
bookieCpuMin: 100m
diff --git a/deploy/kubernetes/minikube/apiserver.yaml
b/deploy/kubernetes/minikube/apiserver.yaml
index 8c08cc9..53d879a 100644
--- a/deploy/kubernetes/minikube/apiserver.yaml
+++ b/deploy/kubernetes/minikube/apiserver.yaml
@@ -83,6 +83,7 @@ spec:
-D
heron.statefulstorage.classname=org.apache.heron.statefulstorage.dlog.DlogStorage
-D
heron.statefulstorage.dlog.namespace.uri=distributedlog://zookeeper:2181/heronbkdl
-D heron.kubernetes.pod.template.configmap.disabled=false
+ -D heron.kubernetes.persistent.volume.claims.cli.disabled=false
---
apiVersion: v1
diff --git
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesConstants.java
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesConstants.java
index 5e7b19a..023d8bc 100644
---
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesConstants.java
+++
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesConstants.java
@@ -52,6 +52,7 @@ public final class KubernetesConstants {
public static final String LABEL_APP = "app";
public static final String LABEL_APP_VALUE = "heron";
public static final String LABEL_TOPOLOGY = "topology";
+ public static final String LABEL_ON_DEMAND = "onDemand";
// prometheus annotation keys
public static final String ANNOTATION_PROMETHEUS_SCRAPE =
"prometheus.io/scrape";
@@ -88,11 +89,13 @@ public final class KubernetesConstants {
public static final String JOB_LINK =
"/api/v1/namespaces/kube-system/services/kubernetes-dashboard/proxy/#/pod";
-
public static final Pattern VALID_POD_NAME_REGEX =
Pattern.compile("[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*",
Pattern.CASE_INSENSITIVE);
+ public static final Pattern VALID_LOWERCASE_RFC_1123_REGEX =
+
Pattern.compile("[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*");
+
public static final List<String> VALID_IMAGE_PULL_POLICIES =
Collections.unmodifiableList(
Arrays.asList(
"IfNotPresent",
@@ -107,4 +110,14 @@ public final class KubernetesConstants {
"node.kubernetes.io/unreachable"
)
);
+
+ enum VolumeClaimTemplateConfigKeys {
+ claimName,
+ storageClassName,
+ sizeLimit,
+ accessModes,
+ volumeMode,
+ path, // Added to container.
+ subPath, // Added to container.
+ }
}
diff --git
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
index 6d29b72..40b7618 100644
---
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
+++
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
@@ -23,7 +23,11 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.regex.Matcher;
+import org.apache.heron.scheduler.TopologySubmissionException;
import org.apache.heron.spi.common.Config;
import org.apache.heron.spi.common.Context;
@@ -109,6 +113,13 @@ public final class KubernetesContext extends Context {
public static final String KUBERNETES_POD_SECRET_KEY_REF_PREFIX =
"heron.kubernetes.pod.secretKeyRef.";
+ // Persistent Volume Claims
+ public static final String KUBERNETES_PERSISTENT_VOLUME_CLAIMS_CLI_DISABLED =
+ "heron.kubernetes.persistent.volume.claims.cli.disabled";
+ //
heron.kubernetes.volumes.persistentVolumeClaim.VOLUME_NAME.OPTION=OPTION_VALUE
+ public static final String KUBERNETES_VOLUME_CLAIM_PREFIX =
+ "heron.kubernetes.volumes.persistentVolumeClaim.";
+
private KubernetesContext() {
}
@@ -211,6 +222,80 @@ public final class KubernetesContext extends Context {
return getConfigItemsByPrefix(config,
KUBERNETES_POD_SECRET_KEY_REF_PREFIX);
}
+ public static boolean getPersistentVolumeClaimDisabled(Config config) {
+ final String disabled =
config.getStringValue(KUBERNETES_PERSISTENT_VOLUME_CLAIMS_CLI_DISABLED);
+ return "true".equalsIgnoreCase(disabled);
+ }
+
+ /**
+ * Collects parameters form the <code>CLI</code> and generates a mapping
between <code>Volumes</code>
+ * and their configuration <code>key-value</code> pairs.
+ * @param config Contains the configuration options collected from the
<code>CLI</code>.
+ * @return A mapping between <code>Volumes</code> and their configuration
<code>key-value</code> pairs.
+ * Will return an empty list if there are no Volume Claim Templates to be
generated.
+ */
+ public static Map<String,
Map<KubernetesConstants.VolumeClaimTemplateConfigKeys, String>>
+ getVolumeClaimTemplates(Config config) {
+ final Logger LOG = Logger.getLogger(V1Controller.class.getName());
+
+ final Set<String> completeConfigParam = getConfigKeys(config,
KUBERNETES_VOLUME_CLAIM_PREFIX);
+ final int prefixLength = KUBERNETES_VOLUME_CLAIM_PREFIX.length();
+ final int volumeNameIdx = 0;
+ final int optionIdx = 1;
+ final Matcher matcher =
KubernetesConstants.VALID_LOWERCASE_RFC_1123_REGEX.matcher("");
+
+ final Map<String, Map<KubernetesConstants.VolumeClaimTemplateConfigKeys,
String>> volumes
+ = new HashMap<>();
+
+ try {
+ for (String param : completeConfigParam) {
+ final String[] tokens = param.substring(prefixLength).split("\\.");
+ final String volumeName = tokens[volumeNameIdx];
+ final KubernetesConstants.VolumeClaimTemplateConfigKeys key =
+
KubernetesConstants.VolumeClaimTemplateConfigKeys.valueOf(tokens[optionIdx]);
+ final String value = config.getStringValue(param);
+
+ Map<KubernetesConstants.VolumeClaimTemplateConfigKeys, String> volume =
+ volumes.get(volumeName);
+ if (volume == null) {
+ // Validate new Volume Names.
+ if (!matcher.reset(volumeName).matches()) {
+ throw new TopologySubmissionException(
+ String.format("Volume name `%s` does not match lowercase
RFC-1123 pattern",
+ volumeName));
+ }
+ volume = new HashMap<>();
+ volumes.put(volumeName, volume);
+ }
+
+ /* Validate Claim and Storage Class names.
+ [1] `claimNameNotOnDemand`: checks for a `claimName` which is not
`OnDemand`.
+ [2] `storageClassName`: Check if it is the provided `option`.
+ Conditions [1] OR [2] are True, then...
+ [3] Check for a valid lowercase RFC-1123 pattern.
+ */
+ boolean claimNameNotOnDemand =
+
KubernetesConstants.VolumeClaimTemplateConfigKeys.claimName.equals(key)
+ &&
!KubernetesConstants.LABEL_ON_DEMAND.equalsIgnoreCase(value);
+ if ((claimNameNotOnDemand // [1]
+ ||
+
KubernetesConstants.VolumeClaimTemplateConfigKeys.storageClassName.equals(key))
// [2]
+ && !matcher.reset(value).matches()) { // [3]
+ throw new TopologySubmissionException(
+ String.format("Option `%s` value `%s` does not match lowercase
RFC-1123 pattern",
+ key, value));
+ }
+
+ volume.put(key, value);
+ }
+ } catch (IndexOutOfBoundsException | IllegalArgumentException e) {
+ final String message = "Invalid Persistent Volume Claim CLI parameter
provided";
+ LOG.log(Level.CONFIG, message);
+ throw new TopologySubmissionException(message);
+ }
+ return volumes;
+ }
+
static Set<String> getConfigKeys(Config config, String keyPrefix) {
Set<String> annotations = new HashSet<>();
for (String s : config.getKeySet()) {
diff --git
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesUtils.java
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesUtils.java
index a75e00c..2363d7a 100644
---
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesUtils.java
+++
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesUtils.java
@@ -122,4 +122,27 @@ final class KubernetesUtils {
}
}
}
+
+ /**
+ * Generic testing class for test runners in Kubernetes Scheduler.
+ * @param <T1> Test input object type.
+ * @param <T2> Expected test object type.
+ */
+ static class TestTuple<T1, T2> {
+ public final String description;
+ public final T1 input;
+ public final T2 expected;
+
+ /**
+ * Configure the test object.
+ * @param description Description of the test to be run.
+ * @param input Input test case.
+ * @param expected Expected output form test.
+ */
+ TestTuple(String description, T1 input, T2 expected) {
+ this.description = description;
+ this.expected = expected;
+ this.input = input;
+ }
+ }
}
diff --git
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
index 305b5d2..e6fc819 100644
---
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
+++
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
@@ -63,6 +63,8 @@ import io.kubernetes.client.openapi.models.V1EnvVarSource;
import io.kubernetes.client.openapi.models.V1LabelSelector;
import io.kubernetes.client.openapi.models.V1ObjectFieldSelector;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
+import io.kubernetes.client.openapi.models.V1PersistentVolumeClaim;
+import io.kubernetes.client.openapi.models.V1PersistentVolumeClaimBuilder;
import io.kubernetes.client.openapi.models.V1PodSpec;
import io.kubernetes.client.openapi.models.V1PodTemplate;
import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
@@ -73,9 +75,12 @@ import io.kubernetes.client.openapi.models.V1Service;
import io.kubernetes.client.openapi.models.V1ServiceSpec;
import io.kubernetes.client.openapi.models.V1StatefulSet;
import io.kubernetes.client.openapi.models.V1StatefulSetSpec;
+import io.kubernetes.client.openapi.models.V1Status;
import io.kubernetes.client.openapi.models.V1Toleration;
import io.kubernetes.client.openapi.models.V1Volume;
+import io.kubernetes.client.openapi.models.V1VolumeBuilder;
import io.kubernetes.client.openapi.models.V1VolumeMount;
+import io.kubernetes.client.openapi.models.V1VolumeMountBuilder;
import io.kubernetes.client.util.PatchUtils;
import io.kubernetes.client.util.Yaml;
import okhttp3.Response;
@@ -94,6 +99,9 @@ public class V1Controller extends KubernetesController {
private final AppsV1Api appsClient;
private final CoreV1Api coreClient;
+ private Map<String, Map<KubernetesConstants.VolumeClaimTemplateConfigKeys,
String>>
+ persistentVolumeClaimConfigs = null;
+
V1Controller(Config configuration, Config runtimeConfiguration) {
super(configuration, runtimeConfiguration);
@@ -130,6 +138,18 @@ public class V1Controller extends KubernetesController {
throw new TopologySubmissionException(e.getMessage());
}
+ // Get and then create Persistent Volume Claims from the CLI.
+ persistentVolumeClaimConfigs =
+ KubernetesContext.getVolumeClaimTemplates(getConfiguration());
+ if (KubernetesContext.getPersistentVolumeClaimDisabled(getConfiguration())
+ && !persistentVolumeClaimConfigs.isEmpty()) {
+ final String message =
+ String.format("Configuring Persistent Volume Claim from CLI is
disabled: '%s'",
+ topologyName);
+ LOG.log(Level.WARNING, message);
+ throw new TopologySubmissionException(message);
+ }
+
// find the max number of instances in a container so we can open
// enough ports if remote debugging is enabled.
int numberOfInstances = 0;
@@ -142,8 +162,9 @@ public class V1Controller extends KubernetesController {
appsClient.createNamespacedStatefulSet(getNamespace(), statefulSet, null,
null, null);
} catch (ApiException e) {
- KubernetesUtils.logExceptionWithDetails(LOG, "Error creating topology",
e);
- throw new TopologySubmissionException(e.getMessage());
+ final String message = String.format("Error creating topology: %s%n",
e.getResponseBody());
+ KubernetesUtils.logExceptionWithDetails(LOG, message, e);
+ throw new TopologySubmissionException(message);
}
return true;
@@ -151,6 +172,7 @@ public class V1Controller extends KubernetesController {
@Override
boolean killTopology() {
+ removePersistentVolumeClaims();
deleteStatefulSet();
deleteService();
return true;
@@ -424,6 +446,9 @@ public class V1Controller extends KubernetesController {
statefulSet.setSpec(statefulSetSpec);
+ statefulSetSpec.setVolumeClaimTemplates(
+ createPersistentVolumeClaims(persistentVolumeClaimConfigs));
+
return statefulSet;
}
@@ -505,6 +530,10 @@ public class V1Controller extends KubernetesController {
containers.add(executorContainer);
}
+ if (!persistentVolumeClaimConfigs.isEmpty()) {
+ configurePodWithPersistentVolumeClaimVolumesAndMounts(podSpec,
executorContainer);
+ }
+
configureExecutorContainer(executorCommand, resource, numberOfInstances,
executorContainer);
podSpec.setContainers(containers);
@@ -852,6 +881,212 @@ public class V1Controller extends KubernetesController {
}
/**
+ * Generates <code>Persistent Volume Claims Templates</code> from a mapping
of <code>Volumes</code>
+ * to <code>key-value</code> pairs of configuration options and values.
+ * @param mapOfOpts <code>Volume</code> to configuration
<code>key-value</code> mappings.
+ * @return Fully populated list of only dynamically backed <code>Persistent
Volume Claims</code>.
+ */
+ @VisibleForTesting
+ protected List<V1PersistentVolumeClaim> createPersistentVolumeClaims(
+ final Map<String, Map<KubernetesConstants.VolumeClaimTemplateConfigKeys,
String>> mapOfOpts) {
+
+ List<V1PersistentVolumeClaim> listOfPVCs = new LinkedList<>();
+
+ // Iterate over all the PVC Volumes.
+ for (Map.Entry<String,
Map<KubernetesConstants.VolumeClaimTemplateConfigKeys, String>> pvc
+ : mapOfOpts.entrySet()) {
+
+ // Only create claims for `OnDemand` volumes.
+ final String claimName = pvc.getValue()
+ .get(KubernetesConstants.VolumeClaimTemplateConfigKeys.claimName);
+ if (claimName != null &&
!KubernetesConstants.LABEL_ON_DEMAND.equalsIgnoreCase(claimName)) {
+ continue;
+ }
+
+ V1PersistentVolumeClaim claim = new V1PersistentVolumeClaimBuilder()
+ .withNewMetadata()
+ .withName(pvc.getKey())
+ .withLabels(getPersistentVolumeClaimLabels(getTopologyName()))
+ .endMetadata()
+ .withNewSpec()
+ .endSpec()
+ .build();
+
+ // Populate PVC options.
+ for (Map.Entry<KubernetesConstants.VolumeClaimTemplateConfigKeys,
String> option
+ : pvc.getValue().entrySet()) {
+ String optionValue = option.getValue();
+ switch(option.getKey()) {
+ case storageClassName:
+ claim.getSpec().setStorageClassName(optionValue);
+ break;
+ case sizeLimit:
+ claim.getSpec().setResources(
+ new V1ResourceRequirements()
+ .putRequestsItem("storage", new
Quantity(optionValue)));
+ break;
+ case accessModes:
+
claim.getSpec().setAccessModes(Arrays.asList(optionValue.split(",")));
+ break;
+ case volumeMode:
+ claim.getSpec().setVolumeMode(optionValue);
+ break;
+ // Valid ignored options not used in a PVC.
+ case path: case subPath: case claimName:
+ break;
+ default:
+ throw new TopologySubmissionException(
+ String.format("Invalid Persistent Volume Claim type option for
'%s'",
+ option.getKey()));
+ }
+ }
+ listOfPVCs.add(claim);
+ }
+ return listOfPVCs;
+ }
+
+ /**
+ * Generates the <code>Volume</code> and <code>Volume Mounts</code> to be
placed in the <code>executor container</code>.
+ * @param mapConfig Mapping of <code>Volumes</code> to
<code>key-value</code> configuration pairs.
+ * @return A pair of configured lists of <code>V1Volume</code> and
<code>V1VolumeMount</code>.
+ */
+ @VisibleForTesting
+ protected Pair<List<V1Volume>, List<V1VolumeMount>>
createPersistentVolumeClaimVolumesAndMounts(
+ final Map<String, Map<KubernetesConstants.VolumeClaimTemplateConfigKeys,
String>> mapConfig) {
+ List<V1Volume> volumeList = new LinkedList<>();
+ List<V1VolumeMount> mountList = new LinkedList<>();
+ for (Map.Entry<String,
Map<KubernetesConstants.VolumeClaimTemplateConfigKeys, String>> configs
+ : mapConfig.entrySet()) {
+ final String volumeName = configs.getKey();
+ final String path = configs.getValue()
+ .get(KubernetesConstants.VolumeClaimTemplateConfigKeys.path);
+ final String subPath = configs.getValue()
+ .get(KubernetesConstants.VolumeClaimTemplateConfigKeys.subPath);
+
+ if (path == null || path.isEmpty()) {
+ throw new TopologySubmissionException(
+ String.format("A mount path is required and missing from '%s'",
volumeName));
+ }
+
+ // Do not create Volumes for `OnDemand`.
+ final String claimName = configs.getValue()
+ .get(KubernetesConstants.VolumeClaimTemplateConfigKeys.claimName);
+ if (claimName != null &&
!KubernetesConstants.LABEL_ON_DEMAND.equalsIgnoreCase(claimName)) {
+ final V1Volume volume = new V1VolumeBuilder()
+ .withName(volumeName)
+ .withNewPersistentVolumeClaim()
+ .withClaimName(claimName)
+ .endPersistentVolumeClaim()
+ .build();
+ volumeList.add(volume);
+ }
+
+ final V1VolumeMountBuilder volumeMount = new V1VolumeMountBuilder()
+ .withName(volumeName)
+ .withMountPath(path);
+ if (subPath != null && !subPath.isEmpty()) {
+ volumeMount.withSubPath(subPath);
+ }
+ mountList.add(volumeMount.build());
+ }
+ return new Pair<>(volumeList, mountList);
+ }
+
+ /**
+ * Makes a call to generate <code>Volumes</code> and <code>Volume
Mounts</code> and then inserts them.
+ * @param podSpec All generated <code>V1Volume</code> will be placed in the
<code>Pod Spec</code>.
+ * @param executor All generated <code>V1VolumeMount</code> will be placed
in the <code>Container</code>.
+ */
+ @VisibleForTesting
+ protected void configurePodWithPersistentVolumeClaimVolumesAndMounts(final
V1PodSpec podSpec,
+ final
V1Container executor) {
+ Pair<List<V1Volume>, List<V1VolumeMount>> volumesAndMounts =
+
createPersistentVolumeClaimVolumesAndMounts(persistentVolumeClaimConfigs);
+
+ // Deduplicate on Names with Persistent Volume Claims taking precedence.
+
+ KubernetesUtils.V1ControllerUtils<V1VolumeMount> utilsMounts =
+ new KubernetesUtils.V1ControllerUtils<>();
+ executor.setVolumeMounts(
+ utilsMounts.mergeListsDedupe(volumesAndMounts.second,
executor.getVolumeMounts(),
+ Comparator.comparing(V1VolumeMount::getName),
+ "Executor and Persistent Volume Claim Volume Mounts"));
+
+ KubernetesUtils.V1ControllerUtils<V1Volume> utilsVolumes =
+ new KubernetesUtils.V1ControllerUtils<>();
+ podSpec.setVolumes(
+ utilsVolumes.mergeListsDedupe(volumesAndMounts.first,
podSpec.getVolumes(),
+ Comparator.comparing(V1Volume::getName),
+ "Pod and Persistent Volume Claim Volumes"));
+ }
+
+ /**
+ * Removes all Persistent Volume Claims associated with a specific topology,
if they exist.
+ * It looks for the following:
+ * metadata:
+ * labels:
+ * topology: <code>topology-name</code>
+ * onDemand: <code>true</code>
+ */
+ private void removePersistentVolumeClaims() {
+ final String topologyName = getTopologyName();
+ final StringBuilder selectorLabel = new StringBuilder();
+
+ // Generate selector label.
+ for (Map.Entry<String, String> label
+ : getPersistentVolumeClaimLabels(topologyName).entrySet()) {
+ if (selectorLabel.length() != 0) {
+ selectorLabel.append(",");
+ }
+
selectorLabel.append(label.getKey()).append("=").append(label.getValue());
+ }
+
+ // Remove all dynamically backed Persistent Volume Claims.
+ try {
+ V1Status status =
coreClient.deleteCollectionNamespacedPersistentVolumeClaim(
+ getNamespace(),
+ null,
+ null,
+ null,
+ null,
+ null,
+ selectorLabel.toString(),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null);
+
+ LOG.log(Level.INFO,
+ String.format("Removing automatically generated Persistent Volume
Claims for `%s`:%n%s",
+ topologyName, status.getMessage()));
+ } catch (ApiException e) {
+ final String message = String.format("Failed to connect to K8s cluster
to delete Persistent "
+ + "Volume Claims for topology `%s`. A manual clean-up is
required.%n%s",
+ topologyName, e.getMessage());
+ LOG.log(Level.WARNING, message);
+ throw new TopologyRuntimeManagementException(message);
+ }
+ }
+
+ /**
+ * Generates the <code>Label</code> which are attached to a Topologies
Persistent Volume Claims.
+ * @param topologyName Attached to the topology match label.
+ * @return A map consisting of the <code>label-value</code> pairs to be used
in <code>Label</code>s.
+ */
+ @VisibleForTesting
+ protected static Map<String, String> getPersistentVolumeClaimLabels(String
topologyName) {
+ return new HashMap<String, String>() {
+ {
+ put(KubernetesConstants.LABEL_TOPOLOGY, topologyName);
+ put(KubernetesConstants.LABEL_ON_DEMAND, "true");
+ }
+ };
+ }
+
+ /**
* Generates the <code>Selector</code> match labels with which resources in
this topology can be found.
* @return A label of the form <code>app=heron,topology=topology-name</code>.
*/
diff --git
a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesContextTest.java
b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesContextTest.java
index 9a762d6..95de4a8 100644
---
a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesContextTest.java
+++
b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesContextTest.java
@@ -19,14 +19,24 @@
package org.apache.heron.scheduler.kubernetes;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
import org.junit.Assert;
import org.junit.Test;
+import org.apache.heron.scheduler.TopologySubmissionException;
+import org.apache.heron.scheduler.kubernetes.KubernetesUtils.TestTuple;
import org.apache.heron.spi.common.Config;
+import static
org.apache.heron.scheduler.kubernetes.KubernetesConstants.VolumeClaimTemplateConfigKeys;
+
public class KubernetesContextTest {
- public static final String KUBERNETES_POD_TEMPLATE_CONFIGMAP_NAME =
+ private static final String TOPOLOGY_NAME = "Topology-Name";
+ private static final String KUBERNETES_POD_TEMPLATE_CONFIGMAP_NAME =
"heron.kubernetes.pod.template.configmap.name";
private static final String POD_TEMPLATE_CONFIGMAP_NAME =
"pod-template-configmap-name";
private final Config config = Config.newBuilder().build();
@@ -59,4 +69,129 @@ public class KubernetesContextTest {
Assert.assertTrue(KubernetesContext
.getPodTemplateConfigMapDisabled(configWithPodTemplateConfigMapOff));
}
+
+ @Test
+ public void testPersistentVolumeClaimDisabled() {
+
Assert.assertFalse(KubernetesContext.getPersistentVolumeClaimDisabled(config));
+ Assert.assertFalse(KubernetesContext
+ .getPersistentVolumeClaimDisabled(configWithPodTemplateConfigMap));
+
+ final Config configWithPodTemplateConfigMapOff = Config.newBuilder()
+ .put(KubernetesContext.KUBERNETES_POD_TEMPLATE_CONFIGMAP_NAME,
+ POD_TEMPLATE_CONFIGMAP_NAME)
+
.put(KubernetesContext.KUBERNETES_PERSISTENT_VOLUME_CLAIMS_CLI_DISABLED, "TRUE")
+ .build();
+ Assert.assertTrue(KubernetesContext
+ .getPersistentVolumeClaimDisabled(configWithPodTemplateConfigMapOff));
+ }
+
+ @Test
+ public void testGetVolumeClaimTemplates() {
+ final String volumeNameOne = "volume-name-one";
+ final String volumeNameTwo = "volume-name-two";
+ final String claimName = "OnDeMaNd";
+ final String keyPattern = KubernetesContext.KUBERNETES_VOLUME_CLAIM_PREFIX
+ "%s.%s";
+
+ final String storageClassField =
VolumeClaimTemplateConfigKeys.storageClassName.name();
+ final String pathField = VolumeClaimTemplateConfigKeys.path.name();
+ final String claimNameField =
VolumeClaimTemplateConfigKeys.claimName.name();
+ final String expectedStorageClass = "expected-storage-class";
+ final String storageClassKeyOne = String.format(keyPattern, volumeNameOne,
storageClassField);
+ final String storageClassKeyTwo = String.format(keyPattern, volumeNameTwo,
storageClassField);
+ final String expectedPath = "/path/for/volume/expected";
+ final String pathKeyOne = String.format(keyPattern, volumeNameOne,
pathField);
+ final String pathKeyTwo = String.format(keyPattern, volumeNameTwo,
pathField);
+ final String claimNameKeyOne = String.format(keyPattern, volumeNameOne,
claimNameField);
+ final String claimNameKeyTwo = String.format(keyPattern, volumeNameTwo,
claimNameField);
+
+ final Config configPVC = Config.newBuilder()
+ .put(pathKeyOne, expectedPath)
+ .put(pathKeyTwo, expectedPath)
+ .put(claimNameKeyOne, claimName)
+ .put(claimNameKeyTwo, claimName)
+ .put(storageClassKeyOne, expectedStorageClass)
+ .put(storageClassKeyTwo, expectedStorageClass)
+ .build();
+
+ final List<String> expectedKeys = Arrays.asList(volumeNameOne,
volumeNameTwo);
+ final List<VolumeClaimTemplateConfigKeys> expectedOptionsKeys =
+ Arrays.asList(VolumeClaimTemplateConfigKeys.path,
+ VolumeClaimTemplateConfigKeys.storageClassName,
+ VolumeClaimTemplateConfigKeys.claimName);
+ final List<String> expectedOptionsValues =
+ Arrays.asList(expectedPath, expectedStorageClass, claimName);
+
+ // List of provided PVC options.
+ final Map<String, Map<VolumeClaimTemplateConfigKeys, String>> mapOfPVC =
+ KubernetesContext.getVolumeClaimTemplates(configPVC);
+
+ Assert.assertTrue("Contains all provided Volumes",
+ mapOfPVC.keySet().containsAll(expectedKeys));
+ for (Map<VolumeClaimTemplateConfigKeys, String> items : mapOfPVC.values())
{
+ Assert.assertTrue("Contains all provided option keys",
+ items.keySet().containsAll(expectedOptionsKeys));
+ Assert.assertTrue("Contains all provided option values",
+ items.values().containsAll(expectedOptionsValues));
+ }
+
+ // Empty PVC.
+ final Map<String, Map<VolumeClaimTemplateConfigKeys, String>> emptyPVC =
+ KubernetesContext.getVolumeClaimTemplates(Config.newBuilder().build());
+ Assert.assertTrue("Empty PVC is returned when no options provided",
emptyPVC.isEmpty());
+ }
+
+ @Test
+ public void testGetPersistentVolumeClaimsErrors() {
+ final String volumeNameValid = "volume-name-valid";
+ final String volumeNameInvalid = "volume-Name-Invalid";
+ final String failureValue = "Should-Fail";
+ final String generalFailureMessage = "Invalid Persistent Volume";
+ final String keyPattern = KubernetesContext.KUBERNETES_VOLUME_CLAIM_PREFIX
+ + "%s.%s";
+ final List<TestTuple<Config, String>> testCases = new LinkedList<>();
+
+ // Invalid option key test.
+ final Config configInvalidOption = Config.newBuilder()
+ .put(String.format(keyPattern, volumeNameValid, "NonExistentKey"),
failureValue)
+ .build();
+ testCases.add(new TestTuple<>("Invalid option key should trigger
exception",
+ configInvalidOption, generalFailureMessage));
+
+ // Just the prefix.
+ final Config configJustPrefix = Config.newBuilder()
+ .put(KubernetesContext.KUBERNETES_VOLUME_CLAIM_PREFIX, failureValue)
+ .build();
+ testCases.add(new TestTuple<>("Only a key prefix should trigger exception",
+ configJustPrefix, generalFailureMessage));
+
+ // Invalid Volume Name.
+ final Config configInvalidVolumeName = Config.newBuilder()
+ .put(String.format(keyPattern, volumeNameInvalid, "path"),
failureValue)
+ .build();
+ testCases.add(new TestTuple<>("Invalid Volume Name should trigger
exception",
+ configInvalidVolumeName, "lowercase RFC-1123"));
+
+ // Invalid Claim Name.
+ final Config configInvalidClaimName = Config.newBuilder()
+ .put(String.format(keyPattern, volumeNameValid, "claimName"),
failureValue)
+ .build();
+ testCases.add(new TestTuple<>("Invalid Claim Name should trigger
exception",
+ configInvalidClaimName, "Option `claimName`"));
+
+ // Invalid Storage Class Name.
+ final Config configInvalidStorageClassName = Config.newBuilder()
+ .put(String.format(keyPattern, volumeNameValid, "storageClassName"),
failureValue)
+ .build();
+ testCases.add(new TestTuple<>("Invalid Storage Class Name should trigger
exception",
+ configInvalidStorageClassName, "Option `storageClassName`"));
+
+ // Testing loop.
+ for (TestTuple<Config, String> testCase : testCases) {
+ try {
+ KubernetesContext.getVolumeClaimTemplates(testCase.input);
+ } catch (TopologySubmissionException e) {
+ Assert.assertTrue(testCase.description,
e.getMessage().contains(testCase.expected));
+ }
+ }
+ }
}
diff --git
a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/V1ControllerTest.java
b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/V1ControllerTest.java
index c777383..b543ea9 100644
---
a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/V1ControllerTest.java
+++
b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/V1ControllerTest.java
@@ -21,8 +21,12 @@ package org.apache.heron.scheduler.kubernetes;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
import org.junit.Assert;
import org.junit.Rule;
@@ -36,6 +40,7 @@ import org.apache.commons.collections4.CollectionUtils;
import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.common.basics.Pair;
import org.apache.heron.scheduler.TopologySubmissionException;
+import org.apache.heron.scheduler.kubernetes.KubernetesUtils.TestTuple;
import org.apache.heron.spi.common.Config;
import org.apache.heron.spi.common.Key;
import org.apache.heron.spi.packing.Resource;
@@ -49,6 +54,8 @@ import io.kubernetes.client.openapi.models.V1ContainerPort;
import io.kubernetes.client.openapi.models.V1EnvVar;
import io.kubernetes.client.openapi.models.V1EnvVarSource;
import io.kubernetes.client.openapi.models.V1ObjectFieldSelector;
+import io.kubernetes.client.openapi.models.V1PersistentVolumeClaim;
+import io.kubernetes.client.openapi.models.V1PersistentVolumeClaimBuilder;
import io.kubernetes.client.openapi.models.V1PodSpec;
import io.kubernetes.client.openapi.models.V1PodSpecBuilder;
import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
@@ -59,6 +66,8 @@ import io.kubernetes.client.openapi.models.V1VolumeBuilder;
import io.kubernetes.client.openapi.models.V1VolumeMount;
import io.kubernetes.client.openapi.models.V1VolumeMountBuilder;
+import static
org.apache.heron.scheduler.kubernetes.KubernetesConstants.VolumeClaimTemplateConfigKeys;
+import static org.mockito.Matchers.anyMap;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doReturn;
@@ -737,4 +746,284 @@ public class V1ControllerTest {
CollectionUtils.containsAll(podSpecWithTolerations.getTolerations(),
expectedTolerationsOverriding));
}
+
+ @Test
+ public void testCreatePersistentVolumeClaims() {
+ final String topologyName = "topology-name";
+ final String volumeNameOne = "volume-name-one";
+ final String volumeNameTwo = "volume-name-two";
+ final String volumeNameStatic = "volume-name-static";
+ final String claimNameOne = "OnDemand";
+ final String claimNameTwo = "claim-name-two";
+ final String claimNameStatic = "OnDEmaND";
+ final String storageClassName = "storage-class-name";
+ final String sizeLimit = "555Gi";
+ final String accessModesList = "ReadWriteOnce,ReadOnlyMany,ReadWriteMany";
+ final String accessModes = "ReadOnlyMany";
+ final String volumeMode = "VolumeMode";
+ final String path = "/path/to/mount/";
+ final String subPath = "/sub/path/to/mount/";
+ final Map<String, Map<VolumeClaimTemplateConfigKeys, String>> mapPVCOpts =
+ ImmutableMap.of(
+ volumeNameOne, new HashMap<VolumeClaimTemplateConfigKeys,
String>() {
+ {
+ put(VolumeClaimTemplateConfigKeys.claimName, claimNameOne);
+ put(VolumeClaimTemplateConfigKeys.storageClassName,
storageClassName);
+ put(VolumeClaimTemplateConfigKeys.sizeLimit, sizeLimit);
+ put(VolumeClaimTemplateConfigKeys.accessModes,
accessModesList);
+ put(VolumeClaimTemplateConfigKeys.volumeMode, volumeMode);
+ put(VolumeClaimTemplateConfigKeys.path, path);
+ }
+ },
+ volumeNameTwo, new HashMap<VolumeClaimTemplateConfigKeys,
String>() {
+ {
+ put(VolumeClaimTemplateConfigKeys.claimName, claimNameTwo);
+ put(VolumeClaimTemplateConfigKeys.storageClassName,
storageClassName);
+ put(VolumeClaimTemplateConfigKeys.sizeLimit, sizeLimit);
+ put(VolumeClaimTemplateConfigKeys.accessModes, accessModes);
+ put(VolumeClaimTemplateConfigKeys.volumeMode, volumeMode);
+ put(VolumeClaimTemplateConfigKeys.path, path);
+ put(VolumeClaimTemplateConfigKeys.subPath, subPath);
+ }
+ },
+ volumeNameStatic, new HashMap<VolumeClaimTemplateConfigKeys,
String>() {
+ {
+ put(VolumeClaimTemplateConfigKeys.claimName, claimNameStatic);
+ put(VolumeClaimTemplateConfigKeys.sizeLimit, sizeLimit);
+ put(VolumeClaimTemplateConfigKeys.accessModes, accessModes);
+ put(VolumeClaimTemplateConfigKeys.volumeMode, volumeMode);
+ put(VolumeClaimTemplateConfigKeys.path, path);
+ put(VolumeClaimTemplateConfigKeys.subPath, subPath);
+ }
+ }
+ );
+
+ final V1PersistentVolumeClaim claimOne = new
V1PersistentVolumeClaimBuilder()
+ .withNewMetadata()
+ .withName(volumeNameOne)
+
.withLabels(V1Controller.getPersistentVolumeClaimLabels(topologyName))
+ .endMetadata()
+ .withNewSpec()
+ .withStorageClassName(storageClassName)
+ .withAccessModes(Arrays.asList(accessModesList.split(",")))
+ .withVolumeMode(volumeMode)
+ .withNewResources()
+ .addToRequests("storage", new Quantity(sizeLimit))
+ .endResources()
+ .endSpec()
+ .build();
+
+ final V1PersistentVolumeClaim claimStatic = new
V1PersistentVolumeClaimBuilder()
+ .withNewMetadata()
+ .withName(volumeNameStatic)
+
.withLabels(V1Controller.getPersistentVolumeClaimLabels(topologyName))
+ .endMetadata()
+ .withNewSpec()
+ .withAccessModes(Collections.singletonList(accessModes))
+ .withVolumeMode(volumeMode)
+ .withNewResources()
+ .addToRequests("storage", new Quantity(sizeLimit))
+ .endResources()
+ .endSpec()
+ .build();
+
+ final List<V1PersistentVolumeClaim> expectedClaims =
+ new LinkedList<>(Arrays.asList(claimOne, claimStatic));
+
+ final List<V1PersistentVolumeClaim> actualClaims =
+ v1ControllerWithPodTemplate.createPersistentVolumeClaims(mapPVCOpts);
+
+ Assert.assertTrue(expectedClaims.containsAll(actualClaims));
+ }
+
+ @Test
+ public void testCreatePersistentVolumeClaimVolumesAndMounts() {
+ final String volumeNameOne = "VolumeNameONE";
+ final String volumeNameTwo = "VolumeNameTWO";
+ final String claimNameOne = "claim-name-one";
+ final String claimNameTwo = "OnDemand";
+ final String mountPathOne = "/mount/path/ONE";
+ final String mountPathTwo = "/mount/path/TWO";
+ final String mountSubPathTwo = "/mount/sub/path/TWO";
+ Map<String, Map<VolumeClaimTemplateConfigKeys, String>> mapOfOpts =
+ ImmutableMap.of(
+ volumeNameOne, ImmutableMap.of(
+ VolumeClaimTemplateConfigKeys.claimName, claimNameOne,
+ VolumeClaimTemplateConfigKeys.path, mountPathOne),
+ volumeNameTwo, ImmutableMap.of(
+ VolumeClaimTemplateConfigKeys.claimName, claimNameTwo,
+ VolumeClaimTemplateConfigKeys.path, mountPathTwo,
+ VolumeClaimTemplateConfigKeys.subPath, mountSubPathTwo)
+ );
+ final V1Volume volumeOne = new V1VolumeBuilder()
+ .withName(volumeNameOne)
+ .withNewPersistentVolumeClaim()
+ .withClaimName(claimNameOne)
+ .endPersistentVolumeClaim()
+ .build();
+ final V1Volume volumeTwo = new V1VolumeBuilder()
+ .withName(volumeNameTwo)
+ .withNewPersistentVolumeClaim()
+ .withClaimName(claimNameTwo)
+ .endPersistentVolumeClaim()
+ .build();
+ final V1VolumeMount volumeMountOne = new V1VolumeMountBuilder()
+ .withName(volumeNameOne)
+ .withMountPath(mountPathOne)
+ .build();
+ final V1VolumeMount volumeMountTwo = new V1VolumeMountBuilder()
+ .withName(volumeNameTwo)
+ .withMountPath(mountPathTwo)
+ .withSubPath(mountSubPathTwo)
+ .build();
+
+ // Test case container.
+ final List<TestTuple<Pair<List<V1Volume>, List<V1VolumeMount>>,
+ Pair<List<V1Volume>, List<V1VolumeMount>>>> testCases = new
LinkedList<>();
+
+ // Default case: No PVC provided.
+ final Pair<List<V1Volume>, List<V1VolumeMount>> actualEmpty =
+
v1ControllerPodTemplate.createPersistentVolumeClaimVolumesAndMounts(new
HashMap<>());
+ testCases.add(new TestTuple<>("Generated an empty list of Volumes",
actualEmpty,
+ new Pair<>(new LinkedList<>(), new LinkedList<>())));
+
+ // PVC Provided.
+ final Pair<List<V1Volume>, List<V1VolumeMount>> expectedFull =
+ new Pair<>(
+ new LinkedList<>(Arrays.asList(volumeOne, volumeTwo)),
+ new LinkedList<>(Arrays.asList(volumeMountOne, volumeMountTwo)));
+ final Pair<List<V1Volume>, List<V1VolumeMount>> actualFull =
+
v1ControllerPodTemplate.createPersistentVolumeClaimVolumesAndMounts(mapOfOpts);
+ testCases.add(new TestTuple<>("Generated a list of Volumes", actualFull,
+ new Pair<>(expectedFull.first, expectedFull.second)));
+
+ // Testing loop.
+ for (TestTuple<Pair<List<V1Volume>, List<V1VolumeMount>>,
+ Pair<List<V1Volume>, List<V1VolumeMount>>> testCase : testCases) {
+ Assert.assertTrue(testCase.description,
+ (testCase.expected.first).containsAll(testCase.input.first));
+ Assert.assertTrue(testCase.description + " Mounts",
+ (testCase.expected.second).containsAll(testCase.input.second));
+ }
+ }
+
+ @Test
+ public void testConfigurePodWithPersistentVolumeClaims() {
+ final String volumeNameClashing = "clashing-volume";
+ final String volumeMountNameClashing = "original-volume-mount";
+ V1Volume baseVolume = new V1VolumeBuilder()
+ .withName(volumeNameClashing)
+ .withNewPersistentVolumeClaim()
+ .withClaimName("Original Base Claim Name")
+ .endPersistentVolumeClaim()
+ .build();
+ V1VolumeMount baseVolumeMount = new V1VolumeMountBuilder()
+ .withName(volumeMountNameClashing)
+ .withMountPath("/original/mount/path")
+ .build();
+ V1Volume clashingVolume = new V1VolumeBuilder()
+ .withName(volumeNameClashing)
+ .withNewPersistentVolumeClaim()
+ .withClaimName("Clashing Claim Replaced")
+ .endPersistentVolumeClaim()
+ .build();
+ V1VolumeMount clashingVolumeMount = new V1VolumeMountBuilder()
+ .withName(volumeMountNameClashing)
+ .withMountPath("/clashing/mount/path")
+ .build();
+ V1Volume secondaryVolume = new V1VolumeBuilder()
+ .withName("secondary-volume")
+ .withNewPersistentVolumeClaim()
+ .withClaimName("Original Secondary Claim Name")
+ .endPersistentVolumeClaim()
+ .build();
+ V1VolumeMount secondaryVolumeMount = new V1VolumeMountBuilder()
+ .withName("secondary-volume-mount")
+ .withMountPath("/secondary/mount/path")
+ .build();
+
+ // Test case container.
+ // Input: Pod Spec to modify, Executor to modify, Volumes and Mounts to
return from
+ // <createPersistentVolumeClaimVolumesAndMounts>.
+ // Output: The expected <V1PodSpec> and <V1Container>.
+ final List<TestTuple<Object[], Pair<V1PodSpec, V1Container>>> testCases =
new LinkedList<>();
+
+ // No Persistent Volume Claim.
+ final V1PodSpec podSpecEmptyCase = new
V1PodSpecBuilder().withVolumes(baseVolume).build();
+ final V1Container executorEmptyCase =
+ new V1ContainerBuilder().withVolumeMounts(baseVolumeMount).build();
+ final V1PodSpec expectedEmptyPodSpec = new
V1PodSpecBuilder().withVolumes(baseVolume).build();
+ final V1Container expectedEmptyExecutor =
+ new V1ContainerBuilder().withVolumeMounts(baseVolumeMount).build();
+ Pair<List<V1Volume>, List<V1VolumeMount>> emptyVolumeAndMount =
+ new Pair<>(new LinkedList<>(), new LinkedList<>());
+
+ testCases.add(new TestTuple<>("Empty",
+ new Object[]{podSpecEmptyCase, executorEmptyCase, emptyVolumeAndMount},
+ new Pair<>(expectedEmptyPodSpec, expectedEmptyExecutor)));
+
+ // Non-clashing Persistent Volume Claim.
+ final V1PodSpec podSpecNoClashCase = new V1PodSpecBuilder()
+ .withVolumes(baseVolume)
+ .build();
+ final V1Container executorNoClashCase = new V1ContainerBuilder()
+ .withVolumeMounts(baseVolumeMount)
+ .build();
+ final V1PodSpec expectedNoClashPodSpec = new V1PodSpecBuilder()
+ .addToVolumes(baseVolume)
+ .addToVolumes(secondaryVolume)
+ .build();
+ final V1Container expectedNoClashExecutor = new V1ContainerBuilder()
+ .addToVolumeMounts(baseVolumeMount)
+ .addToVolumeMounts(secondaryVolumeMount)
+ .build();
+
+ Pair<List<V1Volume>, List<V1VolumeMount>> noClashVolumeAndMount = new
Pair<>(
+ new LinkedList<>(Collections.singletonList(secondaryVolume)),
+ new LinkedList<>(Collections.singletonList(secondaryVolumeMount)));
+
+ testCases.add(new TestTuple<>("No Clash",
+ new Object[]{podSpecNoClashCase, executorNoClashCase,
noClashVolumeAndMount},
+ new Pair<>(expectedNoClashPodSpec, expectedNoClashExecutor)));
+
+ // Clashing Persistent Volume Claim.
+ final V1PodSpec podSpecClashCase = new V1PodSpecBuilder()
+ .withVolumes(baseVolume)
+ .build();
+ final V1Container executorClashCase = new V1ContainerBuilder()
+ .withVolumeMounts(baseVolumeMount)
+ .build();
+ final V1PodSpec expectedClashPodSpec = new V1PodSpecBuilder()
+ .addToVolumes(clashingVolume)
+ .addToVolumes(secondaryVolume)
+ .build();
+ final V1Container expectedClashExecutor = new V1ContainerBuilder()
+ .addToVolumeMounts(clashingVolumeMount)
+ .addToVolumeMounts(secondaryVolumeMount)
+ .build();
+
+ Pair<List<V1Volume>, List<V1VolumeMount>> clashVolumeAndMount = new Pair<>(
+ new LinkedList<>(Arrays.asList(clashingVolume, secondaryVolume)),
+ new LinkedList<>(Arrays.asList(clashingVolumeMount,
secondaryVolumeMount)));
+
+ testCases.add(new TestTuple<>("Clashing",
+ new Object[]{podSpecClashCase, executorClashCase, clashVolumeAndMount},
+ new Pair<>(expectedClashPodSpec, expectedClashExecutor)));
+
+ // Testing loop.
+ for (TestTuple<Object[], Pair<V1PodSpec, V1Container>> testCase :
testCases) {
+ doReturn(testCase.input[2])
+ .when(v1ControllerWithPodTemplate)
+ .createPersistentVolumeClaimVolumesAndMounts(anyMap());
+
+ v1ControllerWithPodTemplate
+ .configurePodWithPersistentVolumeClaimVolumesAndMounts((V1PodSpec)
testCase.input[0],
+ (V1Container) testCase.input[1]);
+
+ Assert.assertEquals("Pod Specs match " + testCase.description,
+ testCase.input[0], testCase.expected.first);
+ Assert.assertEquals("Executors match " + testCase.description,
+ testCase.input[1], testCase.expected.second);
+ }
+ }
}
diff --git a/website2/docs/schedulers-k8s-persistent-volume-claims.md
b/website2/docs/schedulers-k8s-persistent-volume-claims.md
new file mode 100644
index 0000000..994c22c
--- /dev/null
+++ b/website2/docs/schedulers-k8s-persistent-volume-claims.md
@@ -0,0 +1,257 @@
+---
+id: schedulers-k8s-persistent-volume-claims
+title: Kubernetes Persistent Volume Claims via CLI
+sidebar_label: Kubernetes Persistent Volume Claims (CLI)
+---
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+> This document demonstrates how you can utilize both static and dynamically
backed [Persistent Volume
Claims](https://kubernetes.io/docs/concepts/storage/dynamic-provisioning/) in
the `Executor` containers. You will need to enable Dynamic Provisioning in your
Kubernetes cluster to proceed to use the dynamic provisioning functionality.
+
+<br/>
+
+It is possible to leverage Persistent Volumes with custom Pod Templates but
the Volumes you add will be shared between all Pods in the topology.
+
+The CLI commands allow you to configure a Persistent Volume Claim (dynamically
or statically backed) which will be unique and isolated to each Pod and mounted
in a single `Executor` when you submit your topology with a Claim name of
`OnDemand`. Using any Claim name other than on `OnDemand` will permit you to
configure a shared Persistent Volume without a custom Pod Template which will
be specific to an individual Pod. The CLI commands override any configurations
you may have present in t [...]
+
+Some use cases include process checkpointing, caching of results for later use
in the process, intermediate results which could prove useful in analysis
(ETL/ELT to a data lake or warehouse), as a source of data enrichment, etc.
+
+**Note:** Heron ***will*** remove any dynamically backed Persistent Volume
Claims it creates when a topology is terminated. Please be aware that Heron
uses the following `Labels` to locate the claims it has created:
+```yaml
+metadata:
+ labels:
+ topology: <topology-name>
+ onDemand: true
+```
+
+<br>
+
+> ***System Administrators:***
+>
+> * You may wish to disable the ability to configure dynamic Persistent Volume
Claims specified on the CLI. To achieve this, you must pass the define option
`-D heron.kubernetes.persistent.volume.claims.cli.disabled=true` to the Heron
API Server on the command line during boot. This command has been added to the
Kubernetes configuration files to deploy the Heron API Server and is set to
`false` by default.
+> * If you have a custom `Role`/`ClusterRole` for the Heron API Server you
will need to ensure the `ServiceAccount` attached to the API server has the
correct permissions to access the `Persistent Volume Claim`s:
+>
+>```yaml
+>rules:
+>- apiGroups:
+> - ""
+> resources:
+> - persistentvolumeclaims
+> verbs:
+> - create
+> - delete
+> - get
+> - list
+> - deletecollection
+>```
+
+<br>
+
+## Usage
+
+To configure a Persistent Volume Claim you must use the `--config-property`
option with the `heron.kubernetes.volumes.persistentVolumeClaim.` command
prefix. Heron will not validate your Persistent Volume Claim configurations, so
please validate them to ensure they are well-formed. All names must comply with
the [*lowercase
RFC-1123*](https://kubernetes.io/docs/concepts/overview/working-with-objects/names/)
standard.
+
+The command pattern is as follows:
+`heron.kubernetes.volumes.persistentVolumeClaim.[VOLUME NAME].[OPTION]=[VALUE]`
+
+The currently supported CLI `options` are:
+
+* `claimName`
+* `storageClass`
+* `sizeLimit`
+* `accessModes`
+* `volumeMode`
+* `path`
+* `subPath`
+
+***Note:*** A `claimName` of `OnDemand` will create unique Volumes for each
`Executor` as well as deploy a Persistent Volume Claim for each Volume. Any
other Claim name will result in a shared Volume being created between all Pods
in the topology.
+
+***Note:*** The `accessModes` must be a comma separated list of values
*without* any white space. Valid values can be found in the [Kubernetes
documentation](https://kubernetes.io/docs/concepts/storage/persistent-volumes/#access-modes).
+
+***Note:*** If a `storageClassName` is specified and there are no matching
Persistent Volumes then [dynamic
provisioning](https://kubernetes.io/docs/concepts/storage/dynamic-provisioning/)
must be enabled. Kubernetes will attempt to locate a Persistent Volume that
matches the `storageClassName` before it attempts to use dynamic provisioning.
If a `storageClassName` is not specified there must be [Persistent
Volumes](https://kubernetes.io/docs/tasks/configure-pod-container/configure-persi
[...]
+
+<br>
+
+### Example
+
+An example series of commands and the `YAML` entries they make in their
respective configurations are as follows.
+
+***Dynamic:***
+
+```bash
+--config-property
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.claimName=OnDemand
+--config-property
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.storageClassName=storage-class-name-of-choice
+--config-property
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.accessModes=comma,separated,list
+--config-property
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.sizeLimit=555Gi
+--config-property
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.volumeMode=volume-mode-of-choice
+--config-property
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.path=/path/to/mount
+--config-property
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.subPath=/sub/path/to/mount
+```
+
+Generated `Persistent Volume Claim`:
+
+```yaml
+apiVersion: v1
+kind: PersistentVolumeClaim
+metadata:
+ labels:
+ app: heron
+ onDemand: "true"
+ topology: <topology-name>
+ name: volumenameofchoice-<topology-name>-[Ordinal]
+spec:
+ accessModes:
+ - comma
+ - separated
+ - list
+ resources:
+ requests:
+ storage: 555Gi
+ storageClassName: storage-class-name-of-choice
+ volumeMode: volume-mode-of-choice
+```
+
+Pod Spec entries for `Volume`:
+
+```yaml
+volumes:
+ - name: volumenameofchoice
+ persistentVolumeClaim:
+ claimName: volumenameofchoice-<topology-name>-[Ordinal]
+```
+
+`Executor` container entries for `Volume Mounts`:
+
+```yaml
+volumeMounts:
+ - mountPath: /path/to/mount
+ subPath: /sub/path/to/mount
+ name: volumenameofchoice
+```
+
+<br>
+
+***Static:***
+
+```bash
+--config-property
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.claimName=OnDemand
+--config-property
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.accessModes=comma,separated,list
+--config-property
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.sizeLimit=555Gi
+--config-property
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.volumeMode=volume-mode-of-choice
+--config-property
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.path=/path/to/mount
+--config-property
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.subPath=/sub/path/to/mount
+```
+
+Generated `Persistent Volume Claim`:
+
+```yaml
+apiVersion: v1
+kind: PersistentVolumeClaim
+metadata:
+ labels:
+ app: heron
+ onDemand: "true"
+ topology: <topology-name>
+ name: volumenameofchoice-<topology-name>-[Ordinal]
+spec:
+ accessModes:
+ - comma
+ - separated
+ - list
+ resources:
+ requests:
+ storage: 555Gi
+ storageClassName: standard
+ volumeMode: volume-mode-of-choice
+```
+
+Pod Spec entries for `Volume`:
+
+```yaml
+volumes:
+ - name: volumenameofchoice
+ persistentVolumeClaim:
+ claimName: volumenameofchoice-<topology-name>-[Ordinal]
+```
+
+`Executor` container entries for `Volume Mounts`:
+
+```yaml
+volumeMounts:
+ - mountPath: /path/to/mount
+ subPath: /sub/path/to/mount
+ name: volumenameofchoice
+```
+
+<br>
+
+## Submitting
+
+An example of sumbitting a topology using the *dynamic* example CLI commands
above:
+
+```bash
+heron submit kubernetes \
+
--service-url=http://localhost:8001/api/v1/namespaces/default/services/heron-apiserver:9000/proxy
\
+ ~/.heron/examples/heron-api-examples.jar \
+ org.apache.heron.examples.api.AckingTopology acking \
+--config-property
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.claimName=OnDemand
\
+--config-property
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.storageClassName=storage-class-name-of-choice
\
+--config-property
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.accessModes=comma,separated,list
\
+--config-property
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.sizeLimit=555Gi
\
+--config-property
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.volumeMode=volume-mode-of-choice
\
+--config-property
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.path=/path/to/mount
\
+--config-property
heron.kubernetes.volumes.persistentVolumeClaim.volumenameofchoice.subPath=/sub/path/to/mount
+```
+
+## Required and Optional Configuration Items
+
+The following table outlines CLI options which are either ***required*** (
✅ ), ***optional*** ( ❔ ), or ***not available*** ( ❌ )
depending on if you are using dynamic/statically backed or shared `Volume`.
+
+| Option | Dynamic | Static | Shared
+|---|---|---|---|
+| `VOLUME NAME` | ✅ | ✅ | ✅
+| `claimName` | `OnDemand` | `OnDemand` | A valid name
+| `path` | ✅ | ✅ | ✅
+| `subPath` | ❔ | ❔ | ❔
+| `storageClassName` | ✅ | ❌ | ❌
+| `accessModes` | ✅ | ✅ | ❌
+| `sizeLimit` | ❔ | ❔ | ❌
+| `volumeMode` | ❔ | ❔ | ❌
+
+<br>
+
+***Note:*** The `VOLUME NAME` will be extracted from the CLI command and a
`claimName` is a always required.
+
+<br>
+
+## Configuration Items Created and Entries Made
+
+The configuration items and entries in the tables below will made in their
respective areas.
+
+One `Persistent Volume Claim`, a `Volume`, and a `Volume Mount` will be
created for each `volume name` which you specify. Each will be unique to a Pod
within the topology.
+
+| Name | Description | Policy |
+|---|---|---|
+| `VOLUME NAME` | The `name` of the `Volume`. | Entries made in the
`Persistent Volume Claim`'s spec, the Pod Spec's `Volumes`, and the `executor`
containers `volumeMounts`.
+| `claimName` | A Claim name for the Persistent Volume. | If `OnDemand` is
provided as the parameter then a unique Volume and Persistent Volume Claim will
be created. Any other name will result in a shared Volume between all Pods in
the topology with only a Volume and Volume Mount being added.
+| `path` | The `mountPath` of the `Volume`. | Entries made in the `executor`
containers `volumeMounts`.
+| `subPath` | The `subPath` of the `Volume`. | Entries made in the `executor`
containers `volumeMounts`.
+| `storageClassName` | The identifier name used to reference the dynamic
`StorageClass`. | Entries made in the `Persistent Volume Claim` and Pod Spec's
`Volume`.
+| `accessModes` | A comma separated list of access modes. | Entries made in
the `Persistent Volume Claim`.
+| `sizeLimit` | A resource request for storage space. | Entries made in the
`Persistent Volume Claim`.
+| `volumeMode` | Either `FileSystem` (default) or `Block` (raw block). [Read
more](https://kubernetes.io/docs/concepts/storage/_print/#volume-mode). |
Entries made in the `Persistent Volume Claim`.
+| Labels | Two labels for `topology` and `onDemand` provisioning are added. |
These labels are only added to dynamically backed `Persistent Volume Claim`'s
created by Heron to support the removal of any claims created when a topology
is terminated.
diff --git a/website2/website/sidebars.json b/website2/website/sidebars.json
index bab3043..c803fdf 100755
--- a/website2/website/sidebars.json
+++ b/website2/website/sidebars.json
@@ -54,6 +54,7 @@
"schedulers-k8s-by-hand",
"schedulers-k8s-with-helm",
"schedulers-k8s-pod-templates",
+ "schedulers-k8s-persistent-volume-claims",
"schedulers-aurora-cluster",
"schedulers-aurora-local",
"schedulers-local",