surahman commented on a change in pull request #3747:
URL: https://github.com/apache/incubator-heron/pull/3747#discussion_r766850817
##########
File path:
heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
##########
@@ -1101,80 +1104,212 @@ protected V1ConfigMap getConfigMap(String
configMapName) {
}
/**
- * Generates the <code>Volume</code> and <code>Volume Mounts</code> to be
placed in the <code>executor container</code>.
- * @param mapConfig Mapping of <code>Volumes</code> to
<code>key-value</code> configuration pairs.
- * @return A pair of configured lists of <code>V1Volume</code> and
<code>V1VolumeMount</code>.
+ * Generates the <code>Volume Mounts</code> to be placed in the
<code>Executor</code>
+ * and <code>Manager</code> from options on the CLI.
+ * @param volumeName Name of the <code>Volume</code>.
+ * @param configs Mapping of <code>Volume</code> option
<code>key-value</code> configuration pairs.
+ * @return A configured <code>V1VolumeMount</code>.
+ */
+ @VisibleForTesting
+ protected V1VolumeMount createVolumeMountsCLI(String volumeName,
+ Map<KubernetesConstants.VolumeConfigKeys, String> configs) {
+ final V1VolumeMount volumeMount = new V1VolumeMountBuilder()
+ .withName(volumeName)
+ .build();
+ for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> config :
configs.entrySet()) {
+ switch (config.getKey()) {
+ case path:
+ volumeMount.mountPath(config.getValue());
+ break;
+ case subPath:
+ volumeMount.subPath(config.getValue());
+ break;
+ case readOnly:
+ volumeMount.readOnly(Boolean.parseBoolean(config.getValue()));
+ break;
+ default:
+ break;
+ }
+ }
+
+ return volumeMount;
+ }
+
+ /**
+ * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for
<code>Persistent Volume Claims</code>s
+ * to be placed in the <code>Executor</code> and <code>Manager</code> from
options on the CLI.
+ * @param mapConfig Mapping of <code>Volume</code> option
<code>key-value</code> configuration pairs.
+ * @param volumes A list of <code>Volume</code> to append to.
+ * @param volumeMounts A list of <code>Volume Mounts</code> to append to.
*/
@VisibleForTesting
- protected Pair<List<V1Volume>, List<V1VolumeMount>>
createPersistentVolumeClaimVolumesAndMounts(
- final Map<String, Map<KubernetesConstants.VolumeClaimTemplateConfigKeys,
String>> mapConfig) {
- List<V1Volume> volumeList = new LinkedList<>();
- List<V1VolumeMount> mountList = new LinkedList<>();
- for (Map.Entry<String,
Map<KubernetesConstants.VolumeClaimTemplateConfigKeys, String>> configs
+ protected void createVolumeAndMountsPersistentVolumeClaimCLI(
+ Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> mapConfig,
+ List<V1Volume> volumes, List<V1VolumeMount> volumeMounts) {
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
configs
: mapConfig.entrySet()) {
final String volumeName = configs.getKey();
- final String path = configs.getValue()
- .get(KubernetesConstants.VolumeClaimTemplateConfigKeys.path);
- final String subPath = configs.getValue()
- .get(KubernetesConstants.VolumeClaimTemplateConfigKeys.subPath);
-
- if (path == null || path.isEmpty()) {
- throw new TopologySubmissionException(
- String.format("A mount path is required and missing from '%s'",
volumeName));
- }
// Do not create Volumes for `OnDemand`.
final String claimName = configs.getValue()
- .get(KubernetesConstants.VolumeClaimTemplateConfigKeys.claimName);
+ .get(KubernetesConstants.VolumeConfigKeys.claimName);
if (claimName != null &&
!KubernetesConstants.LABEL_ON_DEMAND.equalsIgnoreCase(claimName)) {
- final V1Volume volume = new V1VolumeBuilder()
- .withName(volumeName)
- .withNewPersistentVolumeClaim()
- .withClaimName(claimName)
- .endPersistentVolumeClaim()
- .build();
- volumeList.add(volume);
+ volumes.add(
+ new V1VolumeBuilder()
+ .withName(volumeName)
+ .withNewPersistentVolumeClaim()
+ .withClaimName(claimName)
+ .endPersistentVolumeClaim()
+ .build()
+ );
}
+ volumeMounts.add(createVolumeMountsCLI(volumeName, configs.getValue()));
+ }
+ }
- final V1VolumeMountBuilder volumeMount = new V1VolumeMountBuilder()
+ /**
+ * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for
<code>emptyDir</code>s to be
+ * placed in the <code>Executor</code> and <code>Manager</code> from options
on the CLI.
+ * @param mapOfOpts Mapping of <code>Volume</code> option
<code>key-value</code> configuration pairs.
+ * @param volumes A list of <code>Volume</code> to append to.
+ * @param volumeMounts A list of <code>Volume Mounts</code> to append to.
+ */
+ @VisibleForTesting
+ protected void createVolumeAndMountsEmptyDirCLI(
+ Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> mapOfOpts,
+ List<V1Volume> volumes, List<V1VolumeMount> volumeMounts) {
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
configs
+ : mapOfOpts.entrySet()) {
+ final String volumeName = configs.getKey();
+ final V1Volume volume = new V1VolumeBuilder()
.withName(volumeName)
- .withMountPath(path);
- if (subPath != null && !subPath.isEmpty()) {
- volumeMount.withSubPath(subPath);
+ .withNewEmptyDir()
+ .endEmptyDir()
+ .build();
+
+ for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> config
+ : configs.getValue().entrySet()) {
+ switch(config.getKey()) {
+ case medium:
+ volume.getEmptyDir().medium(config.getValue());
+ break;
+ case sizeLimit:
+ volume.getEmptyDir().sizeLimit(new Quantity(config.getValue()));
+ break;
+ default:
+ break;
+ }
}
- mountList.add(volumeMount.build());
+ volumes.add(volume);
+ volumeMounts.add(createVolumeMountsCLI(volumeName, configs.getValue()));
}
- return new Pair<>(volumeList, mountList);
}
/**
- * Makes a call to generate <code>Volumes</code> and <code>Volume
Mounts</code> and then inserts them.
+ * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for
<code>Host Path</code>s to be
+ * placed in the <code>Executor</code> and <code>Manager</code> from options
on the CLI.
+ * @param mapOfOpts Mapping of <code>Volume</code> option
<code>key-value</code> configuration pairs.
+ * @param volumes A list of <code>Volume</code> to append to.
+ * @param volumeMounts A list of <code>Volume Mounts</code> to append to.
+ */
+ @VisibleForTesting
+ protected void createVolumeAndMountsHostPathCLI(
+ Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> mapOfOpts,
+ List<V1Volume> volumes, List<V1VolumeMount> volumeMounts) {
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
configs
+ : mapOfOpts.entrySet()) {
+ final String volumeName = configs.getKey();
+ final V1Volume volume = new V1VolumeBuilder()
+ .withName(volumeName)
+ .withNewHostPath()
+ .endHostPath()
+ .build();
+
+ for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> config
+ : configs.getValue().entrySet()) {
+ switch(config.getKey()) {
+ case type:
+ volume.getHostPath().setType(config.getValue());
+ break;
+ case pathOnHost:
+ volume.getHostPath().setPath(config.getValue());
+ break;
+ default:
+ break;
+ }
+ }
+ volumes.add(volume);
+ volumeMounts.add(createVolumeMountsCLI(volumeName, configs.getValue()));
+ }
+ }
+
+ /**
+ * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for
<code>NFS</code>s to be
+ * placed in the <code>Executor</code> and <code>Manager</code> from options
on the CLI.
+ * @param mapOfOpts Mapping of <code>Volume</code> option
<code>key-value</code> configuration pairs.
+ * @param volumes A list of <code>Volume</code> to append to.
+ * @param volumeMounts A list of <code>Volume Mounts</code> to append to.
+ */
+ @VisibleForTesting
+ protected void createVolumeAndMountsNFSCLI(
+ Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> mapOfOpts,
Review comment:
Good catch. Setting to `final` will potentially allow the Java compiler
to apply some optimizations. Regrettably, `final` is not the same as `const` in
C/C++ where it will make the object immutable. It will, however, allow the IDE
and compiler to error if you try reassigning the object to new memory.
Changes affected.
##########
File path:
heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
##########
@@ -1101,80 +1104,212 @@ protected V1ConfigMap getConfigMap(String
configMapName) {
}
/**
- * Generates the <code>Volume</code> and <code>Volume Mounts</code> to be
placed in the <code>executor container</code>.
- * @param mapConfig Mapping of <code>Volumes</code> to
<code>key-value</code> configuration pairs.
- * @return A pair of configured lists of <code>V1Volume</code> and
<code>V1VolumeMount</code>.
+ * Generates the <code>Volume Mounts</code> to be placed in the
<code>Executor</code>
+ * and <code>Manager</code> from options on the CLI.
+ * @param volumeName Name of the <code>Volume</code>.
+ * @param configs Mapping of <code>Volume</code> option
<code>key-value</code> configuration pairs.
+ * @return A configured <code>V1VolumeMount</code>.
+ */
+ @VisibleForTesting
+ protected V1VolumeMount createVolumeMountsCLI(String volumeName,
+ Map<KubernetesConstants.VolumeConfigKeys, String> configs) {
+ final V1VolumeMount volumeMount = new V1VolumeMountBuilder()
+ .withName(volumeName)
+ .build();
+ for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> config :
configs.entrySet()) {
+ switch (config.getKey()) {
+ case path:
+ volumeMount.mountPath(config.getValue());
+ break;
+ case subPath:
+ volumeMount.subPath(config.getValue());
+ break;
+ case readOnly:
+ volumeMount.readOnly(Boolean.parseBoolean(config.getValue()));
+ break;
+ default:
+ break;
+ }
+ }
+
+ return volumeMount;
+ }
+
+ /**
+ * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for
<code>Persistent Volume Claims</code>s
+ * to be placed in the <code>Executor</code> and <code>Manager</code> from
options on the CLI.
+ * @param mapConfig Mapping of <code>Volume</code> option
<code>key-value</code> configuration pairs.
+ * @param volumes A list of <code>Volume</code> to append to.
+ * @param volumeMounts A list of <code>Volume Mounts</code> to append to.
*/
@VisibleForTesting
- protected Pair<List<V1Volume>, List<V1VolumeMount>>
createPersistentVolumeClaimVolumesAndMounts(
- final Map<String, Map<KubernetesConstants.VolumeClaimTemplateConfigKeys,
String>> mapConfig) {
- List<V1Volume> volumeList = new LinkedList<>();
- List<V1VolumeMount> mountList = new LinkedList<>();
- for (Map.Entry<String,
Map<KubernetesConstants.VolumeClaimTemplateConfigKeys, String>> configs
+ protected void createVolumeAndMountsPersistentVolumeClaimCLI(
+ Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> mapConfig,
+ List<V1Volume> volumes, List<V1VolumeMount> volumeMounts) {
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
configs
: mapConfig.entrySet()) {
final String volumeName = configs.getKey();
- final String path = configs.getValue()
- .get(KubernetesConstants.VolumeClaimTemplateConfigKeys.path);
- final String subPath = configs.getValue()
- .get(KubernetesConstants.VolumeClaimTemplateConfigKeys.subPath);
-
- if (path == null || path.isEmpty()) {
- throw new TopologySubmissionException(
- String.format("A mount path is required and missing from '%s'",
volumeName));
- }
// Do not create Volumes for `OnDemand`.
final String claimName = configs.getValue()
- .get(KubernetesConstants.VolumeClaimTemplateConfigKeys.claimName);
+ .get(KubernetesConstants.VolumeConfigKeys.claimName);
if (claimName != null &&
!KubernetesConstants.LABEL_ON_DEMAND.equalsIgnoreCase(claimName)) {
- final V1Volume volume = new V1VolumeBuilder()
- .withName(volumeName)
- .withNewPersistentVolumeClaim()
- .withClaimName(claimName)
- .endPersistentVolumeClaim()
- .build();
- volumeList.add(volume);
+ volumes.add(
+ new V1VolumeBuilder()
+ .withName(volumeName)
+ .withNewPersistentVolumeClaim()
+ .withClaimName(claimName)
+ .endPersistentVolumeClaim()
+ .build()
+ );
}
+ volumeMounts.add(createVolumeMountsCLI(volumeName, configs.getValue()));
+ }
+ }
- final V1VolumeMountBuilder volumeMount = new V1VolumeMountBuilder()
+ /**
+ * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for
<code>emptyDir</code>s to be
+ * placed in the <code>Executor</code> and <code>Manager</code> from options
on the CLI.
+ * @param mapOfOpts Mapping of <code>Volume</code> option
<code>key-value</code> configuration pairs.
+ * @param volumes A list of <code>Volume</code> to append to.
+ * @param volumeMounts A list of <code>Volume Mounts</code> to append to.
+ */
+ @VisibleForTesting
+ protected void createVolumeAndMountsEmptyDirCLI(
+ Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> mapOfOpts,
+ List<V1Volume> volumes, List<V1VolumeMount> volumeMounts) {
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
configs
+ : mapOfOpts.entrySet()) {
+ final String volumeName = configs.getKey();
+ final V1Volume volume = new V1VolumeBuilder()
.withName(volumeName)
- .withMountPath(path);
- if (subPath != null && !subPath.isEmpty()) {
- volumeMount.withSubPath(subPath);
+ .withNewEmptyDir()
+ .endEmptyDir()
+ .build();
+
+ for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> config
+ : configs.getValue().entrySet()) {
+ switch(config.getKey()) {
+ case medium:
+ volume.getEmptyDir().medium(config.getValue());
+ break;
+ case sizeLimit:
+ volume.getEmptyDir().sizeLimit(new Quantity(config.getValue()));
+ break;
+ default:
+ break;
+ }
}
- mountList.add(volumeMount.build());
+ volumes.add(volume);
+ volumeMounts.add(createVolumeMountsCLI(volumeName, configs.getValue()));
}
- return new Pair<>(volumeList, mountList);
}
/**
- * Makes a call to generate <code>Volumes</code> and <code>Volume
Mounts</code> and then inserts them.
+ * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for
<code>Host Path</code>s to be
+ * placed in the <code>Executor</code> and <code>Manager</code> from options
on the CLI.
+ * @param mapOfOpts Mapping of <code>Volume</code> option
<code>key-value</code> configuration pairs.
+ * @param volumes A list of <code>Volume</code> to append to.
+ * @param volumeMounts A list of <code>Volume Mounts</code> to append to.
+ */
+ @VisibleForTesting
+ protected void createVolumeAndMountsHostPathCLI(
+ Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> mapOfOpts,
Review comment:
Changes affected.
##########
File path:
heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
##########
@@ -1101,80 +1104,212 @@ protected V1ConfigMap getConfigMap(String
configMapName) {
}
/**
- * Generates the <code>Volume</code> and <code>Volume Mounts</code> to be
placed in the <code>executor container</code>.
- * @param mapConfig Mapping of <code>Volumes</code> to
<code>key-value</code> configuration pairs.
- * @return A pair of configured lists of <code>V1Volume</code> and
<code>V1VolumeMount</code>.
+ * Generates the <code>Volume Mounts</code> to be placed in the
<code>Executor</code>
+ * and <code>Manager</code> from options on the CLI.
+ * @param volumeName Name of the <code>Volume</code>.
+ * @param configs Mapping of <code>Volume</code> option
<code>key-value</code> configuration pairs.
+ * @return A configured <code>V1VolumeMount</code>.
+ */
+ @VisibleForTesting
+ protected V1VolumeMount createVolumeMountsCLI(String volumeName,
+ Map<KubernetesConstants.VolumeConfigKeys, String> configs) {
+ final V1VolumeMount volumeMount = new V1VolumeMountBuilder()
+ .withName(volumeName)
+ .build();
+ for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> config :
configs.entrySet()) {
+ switch (config.getKey()) {
+ case path:
+ volumeMount.mountPath(config.getValue());
+ break;
+ case subPath:
+ volumeMount.subPath(config.getValue());
+ break;
+ case readOnly:
+ volumeMount.readOnly(Boolean.parseBoolean(config.getValue()));
+ break;
+ default:
+ break;
+ }
+ }
+
+ return volumeMount;
+ }
+
+ /**
+ * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for
<code>Persistent Volume Claims</code>s
+ * to be placed in the <code>Executor</code> and <code>Manager</code> from
options on the CLI.
+ * @param mapConfig Mapping of <code>Volume</code> option
<code>key-value</code> configuration pairs.
+ * @param volumes A list of <code>Volume</code> to append to.
+ * @param volumeMounts A list of <code>Volume Mounts</code> to append to.
*/
@VisibleForTesting
- protected Pair<List<V1Volume>, List<V1VolumeMount>>
createPersistentVolumeClaimVolumesAndMounts(
- final Map<String, Map<KubernetesConstants.VolumeClaimTemplateConfigKeys,
String>> mapConfig) {
- List<V1Volume> volumeList = new LinkedList<>();
- List<V1VolumeMount> mountList = new LinkedList<>();
- for (Map.Entry<String,
Map<KubernetesConstants.VolumeClaimTemplateConfigKeys, String>> configs
+ protected void createVolumeAndMountsPersistentVolumeClaimCLI(
+ Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> mapConfig,
+ List<V1Volume> volumes, List<V1VolumeMount> volumeMounts) {
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
configs
: mapConfig.entrySet()) {
final String volumeName = configs.getKey();
- final String path = configs.getValue()
- .get(KubernetesConstants.VolumeClaimTemplateConfigKeys.path);
- final String subPath = configs.getValue()
- .get(KubernetesConstants.VolumeClaimTemplateConfigKeys.subPath);
-
- if (path == null || path.isEmpty()) {
- throw new TopologySubmissionException(
- String.format("A mount path is required and missing from '%s'",
volumeName));
- }
// Do not create Volumes for `OnDemand`.
final String claimName = configs.getValue()
- .get(KubernetesConstants.VolumeClaimTemplateConfigKeys.claimName);
+ .get(KubernetesConstants.VolumeConfigKeys.claimName);
if (claimName != null &&
!KubernetesConstants.LABEL_ON_DEMAND.equalsIgnoreCase(claimName)) {
- final V1Volume volume = new V1VolumeBuilder()
- .withName(volumeName)
- .withNewPersistentVolumeClaim()
- .withClaimName(claimName)
- .endPersistentVolumeClaim()
- .build();
- volumeList.add(volume);
+ volumes.add(
+ new V1VolumeBuilder()
+ .withName(volumeName)
+ .withNewPersistentVolumeClaim()
+ .withClaimName(claimName)
+ .endPersistentVolumeClaim()
+ .build()
+ );
}
+ volumeMounts.add(createVolumeMountsCLI(volumeName, configs.getValue()));
+ }
+ }
- final V1VolumeMountBuilder volumeMount = new V1VolumeMountBuilder()
+ /**
+ * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for
<code>emptyDir</code>s to be
+ * placed in the <code>Executor</code> and <code>Manager</code> from options
on the CLI.
+ * @param mapOfOpts Mapping of <code>Volume</code> option
<code>key-value</code> configuration pairs.
+ * @param volumes A list of <code>Volume</code> to append to.
+ * @param volumeMounts A list of <code>Volume Mounts</code> to append to.
+ */
+ @VisibleForTesting
+ protected void createVolumeAndMountsEmptyDirCLI(
+ Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> mapOfOpts,
Review comment:
Changes affected.
##########
File path:
heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
##########
@@ -1101,80 +1104,212 @@ protected V1ConfigMap getConfigMap(String
configMapName) {
}
/**
- * Generates the <code>Volume</code> and <code>Volume Mounts</code> to be
placed in the <code>executor container</code>.
- * @param mapConfig Mapping of <code>Volumes</code> to
<code>key-value</code> configuration pairs.
- * @return A pair of configured lists of <code>V1Volume</code> and
<code>V1VolumeMount</code>.
+ * Generates the <code>Volume Mounts</code> to be placed in the
<code>Executor</code>
+ * and <code>Manager</code> from options on the CLI.
+ * @param volumeName Name of the <code>Volume</code>.
+ * @param configs Mapping of <code>Volume</code> option
<code>key-value</code> configuration pairs.
+ * @return A configured <code>V1VolumeMount</code>.
+ */
+ @VisibleForTesting
+ protected V1VolumeMount createVolumeMountsCLI(String volumeName,
Review comment:
Changes affected.
##########
File path:
heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
##########
@@ -519,11 +524,12 @@ private V1StatefulSet createStatefulSet(Resource
containerResource, int numberOf
* @param resource Passed down to configure the resource limits.
* @param numberOfInstances Passed down to configure the ports.
* @param isExecutor Flag used to configure components specific to
<code>Executor</code> and <code>Manager</code>.
- * @param configPVC <code>Persistent Volume Claim</code> configurations
options.
+ * @param volumes <code>Volumes</code> generated from configurations options.
+ * @param volumeMounts <code>Volume Mounts</code> generated from
configurations options.
*/
private void configurePodSpec(final V1PodTemplateSpec podTemplateSpec,
Resource resource,
Review comment:
Changes affected.
##########
File path:
heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
##########
@@ -291,31 +309,202 @@ public static boolean
getPersistentVolumeClaimDisabled(Config config) {
volumes.put(volumeName, volume);
}
- /* Validate Claim and Storage Class names.
- [1] `claimNameNotOnDemand`: checks for a `claimName` which is not
`OnDemand`.
- [2] `storageClassName`: Check if it is the provided `option`.
- Conditions [1] OR [2] are True, then...
- [3] Check for a valid lowercase RFC-1123 pattern.
- */
- boolean claimNameNotOnDemand =
-
KubernetesConstants.VolumeClaimTemplateConfigKeys.claimName.equals(key)
- &&
!KubernetesConstants.LABEL_ON_DEMAND.equalsIgnoreCase(value);
- if ((claimNameNotOnDemand // [1]
- ||
-
KubernetesConstants.VolumeClaimTemplateConfigKeys.storageClassName.equals(key))
// [2]
- && !matcher.reset(value).matches()) { // [3]
- throw new TopologySubmissionException(
- String.format("Option `%s` value `%s` does not match lowercase
RFC-1123 pattern",
- key, value));
- }
-
volume.put(key, value);
}
} catch (IndexOutOfBoundsException | IllegalArgumentException e) {
- final String message = "Invalid Persistent Volume Claim CLI parameter
provided";
+ final String message = "Invalid Volume configuration option provided on
CLI";
LOG.log(Level.CONFIG, message);
throw new TopologySubmissionException(message);
}
+
+ // All Volumes must contain a path.
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
volume
+ : volumes.entrySet()) {
+ final String path =
volume.getValue().get(KubernetesConstants.VolumeConfigKeys.path);
+ if (path == null || path.isEmpty()) {
+ throw new TopologySubmissionException(String.format("Volume `%s`: All
Volumes require a"
+ + " 'path'.", volume.getKey()));
+ }
+ }
+
+ return volumes;
+ }
+
+ /**
+ * Collects parameters form the <code>CLI</code> and validates options for
<code>PVC</code>s.
+ * @param config Contains the configuration options collected from the
<code>CLI</code>.
+ * @param isExecutor Flag used to collect CLI commands for the
<code>Executor</code> and <code>Manager</code>.
+ * @return A mapping between <code>Volumes</code> and their configuration
<code>key-value</code> pairs.
+ * Will return an empty list if there are no Volume Claim Templates to be
generated.
+ */
+ public static Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
+ getVolumeClaimTemplates(Config config, boolean isExecutor) {
+ final Matcher matcher =
KubernetesConstants.VALID_LOWERCASE_RFC_1123_REGEX.matcher("");
+
+ final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
volumes =
+ getVolumeConfigs(config,
KubernetesContext.KUBERNETES_VOLUME_CLAIM_PREFIX, isExecutor);
+
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
volume
+ : volumes.entrySet()) {
+
+ // Claim name is required.
+ if
(!volume.getValue().containsKey(KubernetesConstants.VolumeConfigKeys.claimName))
{
+ throw new TopologySubmissionException(String.format("Volume `%s`:
Persistent Volume"
+ + " Claims require a `claimName`.", volume.getKey()));
+ }
+
+ for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> volumeConfig
+ : volume.getValue().entrySet()) {
+ final KubernetesConstants.VolumeConfigKeys key = volumeConfig.getKey();
+ final String value = volumeConfig.getValue();
+
+ switch (key) {
+ case claimName:
+ // Claim names which are not OnDemand should be lowercase RFC-1123.
+ if (!matcher.reset(value).matches()
+ &&
!KubernetesConstants.LABEL_ON_DEMAND.equalsIgnoreCase(value)) {
+ throw new TopologySubmissionException(String.format("Volume
`%s`: `claimName` does"
+ + " not match lowercase RFC-1123 pattern", volume.getKey()));
+ }
+ break;
+ case storageClassName:
+ if (!matcher.reset(value).matches()) {
+ throw new TopologySubmissionException(String.format("Volume
`%s`: `storageClassName`"
+ + " does not match lowercase RFC-1123 pattern",
volume.getKey()));
+ }
+ break;
+ case sizeLimit: case accessModes: case volumeMode: case readOnly:
case path: case subPath:
+ break;
+ default:
+ throw new TopologySubmissionException(String.format("Volume `%s`:
Invalid Persistent"
+ + " Volume Claim type option for '%s'", volume.getKey(), key));
+ }
+ }
+ }
+
+ return volumes;
+ }
+
+ /**
+ * Collects parameters form the <code>CLI</code> and validates options for
<code>Empty Directory</code>s.
+ * @param config Contains the configuration options collected from the
<code>CLI</code>.
+ * @param isExecutor Flag used to collect CLI commands for the
<code>Executor</code> and <code>Manager</code>.
+ * @return A mapping between <code>Volumes</code> and their configuration
<code>key-value</code> pairs.
+ * Will return an empty list if there are no Volume Claim Templates to be
generated.
+ */
+ public static Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
+ getVolumeEmptyDir(Config config, boolean isExecutor) {
+ final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
volumes =
+ getVolumeConfigs(config,
KubernetesContext.KUBERNETES_VOLUME_EMPTYDIR_PREFIX, isExecutor);
+
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
volume
+ : volumes.entrySet()) {
+ final String medium =
volume.getValue().get(KubernetesConstants.VolumeConfigKeys.medium);
+
+ if (medium != null && !medium.isEmpty() && !"Memory".equals(medium)) {
+ throw new TopologySubmissionException(String.format("Volume `%s`:
Empty Directory"
+ + " 'medium' must be 'Memory' or empty.", volume.getKey()));
+ }
+ for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> volumeConfig
+ : volume.getValue().entrySet()) {
+ final KubernetesConstants.VolumeConfigKeys key = volumeConfig.getKey();
+
+ switch (key) {
+ case sizeLimit: case medium: case readOnly: case path: case subPath:
+ break;
+ default:
+ throw new TopologySubmissionException(String.format("Volume `%s`:
Invalid Empty"
+ + " Directory type option for '%s'", volume.getKey(), key));
+ }
+ }
+ }
+
+ return volumes;
+ }
+
+ /**
+ * Collects parameters form the <code>CLI</code> and validates options for
<code>Host Path</code>s.
+ * @param config Contains the configuration options collected from the
<code>CLI</code>.
+ * @param isExecutor Flag used to collect CLI commands for the
<code>Executor</code> and <code>Manager</code>.
+ * @return A mapping between <code>Volumes</code> and their configuration
<code>key-value</code> pairs.
+ * Will return an empty list if there are no Volume Claim Templates to be
generated.
+ */
+ public static Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
+ getVolumeHostPath(Config config, boolean isExecutor) {
+ final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
volumes =
+ getVolumeConfigs(config,
KubernetesContext.KUBERNETES_VOLUME_HOSTPATH_PREFIX, isExecutor);
+
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
volume
+ : volumes.entrySet()) {
+ final String type =
volume.getValue().get(KubernetesConstants.VolumeConfigKeys.type);
+ if (type != null &&
!KubernetesConstants.VALID_VOLUME_HOSTPATH_TYPES.contains(type)) {
+ throw new TopologySubmissionException(String.format("Volume `%s`: Host
Path"
+ + " 'type' of '%s' is invalid.", volume.getKey(), type));
+ }
+ final String hostOnPath =
+
volume.getValue().get(KubernetesConstants.VolumeConfigKeys.pathOnHost);
+ if (hostOnPath == null || hostOnPath.isEmpty()) {
+ throw new TopologySubmissionException(String.format("Volume `%s`: Host
Path requires a"
+ + " path on the host.", volume.getKey()));
+ }
+
+ for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> volumeConfig
+ : volume.getValue().entrySet()) {
+ final KubernetesConstants.VolumeConfigKeys key = volumeConfig.getKey();
+
+ switch (key) {
+ case type: case pathOnHost: case readOnly: case path: case subPath:
+ break;
+ default:
+ throw new TopologySubmissionException(String.format("Volume `%s`:
Invalid Host Path"
+ + " option for '%s'", volume.getKey(), key));
+ }
+ }
+ }
+
+ return volumes;
+ }
+
+ /**
+ * Collects parameters form the <code>CLI</code> and validates options for
<code>NFS</code>s.
+ * @param config Contains the configuration options collected from the
<code>CLI</code>.
+ * @param isExecutor Flag used to collect CLI commands for the
<code>Executor</code> and <code>Manager</code>.
+ * @return A mapping between <code>Volumes</code> and their configuration
<code>key-value</code> pairs.
+ * Will return an empty list if there are no Volume Claim Templates to be
generated.
+ */
+ public static Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
Review comment:
Changes affected.
##########
File path:
heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
##########
@@ -291,31 +309,202 @@ public static boolean
getPersistentVolumeClaimDisabled(Config config) {
volumes.put(volumeName, volume);
}
- /* Validate Claim and Storage Class names.
- [1] `claimNameNotOnDemand`: checks for a `claimName` which is not
`OnDemand`.
- [2] `storageClassName`: Check if it is the provided `option`.
- Conditions [1] OR [2] are True, then...
- [3] Check for a valid lowercase RFC-1123 pattern.
- */
- boolean claimNameNotOnDemand =
-
KubernetesConstants.VolumeClaimTemplateConfigKeys.claimName.equals(key)
- &&
!KubernetesConstants.LABEL_ON_DEMAND.equalsIgnoreCase(value);
- if ((claimNameNotOnDemand // [1]
- ||
-
KubernetesConstants.VolumeClaimTemplateConfigKeys.storageClassName.equals(key))
// [2]
- && !matcher.reset(value).matches()) { // [3]
- throw new TopologySubmissionException(
- String.format("Option `%s` value `%s` does not match lowercase
RFC-1123 pattern",
- key, value));
- }
-
volume.put(key, value);
}
} catch (IndexOutOfBoundsException | IllegalArgumentException e) {
- final String message = "Invalid Persistent Volume Claim CLI parameter
provided";
+ final String message = "Invalid Volume configuration option provided on
CLI";
LOG.log(Level.CONFIG, message);
throw new TopologySubmissionException(message);
}
+
+ // All Volumes must contain a path.
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
volume
+ : volumes.entrySet()) {
+ final String path =
volume.getValue().get(KubernetesConstants.VolumeConfigKeys.path);
+ if (path == null || path.isEmpty()) {
+ throw new TopologySubmissionException(String.format("Volume `%s`: All
Volumes require a"
+ + " 'path'.", volume.getKey()));
+ }
+ }
+
+ return volumes;
+ }
+
+ /**
+ * Collects parameters form the <code>CLI</code> and validates options for
<code>PVC</code>s.
+ * @param config Contains the configuration options collected from the
<code>CLI</code>.
+ * @param isExecutor Flag used to collect CLI commands for the
<code>Executor</code> and <code>Manager</code>.
+ * @return A mapping between <code>Volumes</code> and their configuration
<code>key-value</code> pairs.
+ * Will return an empty list if there are no Volume Claim Templates to be
generated.
+ */
+ public static Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
+ getVolumeClaimTemplates(Config config, boolean isExecutor) {
+ final Matcher matcher =
KubernetesConstants.VALID_LOWERCASE_RFC_1123_REGEX.matcher("");
+
+ final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
volumes =
+ getVolumeConfigs(config,
KubernetesContext.KUBERNETES_VOLUME_CLAIM_PREFIX, isExecutor);
+
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
volume
+ : volumes.entrySet()) {
+
+ // Claim name is required.
+ if
(!volume.getValue().containsKey(KubernetesConstants.VolumeConfigKeys.claimName))
{
+ throw new TopologySubmissionException(String.format("Volume `%s`:
Persistent Volume"
+ + " Claims require a `claimName`.", volume.getKey()));
+ }
+
+ for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> volumeConfig
+ : volume.getValue().entrySet()) {
+ final KubernetesConstants.VolumeConfigKeys key = volumeConfig.getKey();
+ final String value = volumeConfig.getValue();
+
+ switch (key) {
+ case claimName:
+ // Claim names which are not OnDemand should be lowercase RFC-1123.
+ if (!matcher.reset(value).matches()
+ &&
!KubernetesConstants.LABEL_ON_DEMAND.equalsIgnoreCase(value)) {
+ throw new TopologySubmissionException(String.format("Volume
`%s`: `claimName` does"
+ + " not match lowercase RFC-1123 pattern", volume.getKey()));
+ }
+ break;
+ case storageClassName:
+ if (!matcher.reset(value).matches()) {
+ throw new TopologySubmissionException(String.format("Volume
`%s`: `storageClassName`"
+ + " does not match lowercase RFC-1123 pattern",
volume.getKey()));
+ }
+ break;
+ case sizeLimit: case accessModes: case volumeMode: case readOnly:
case path: case subPath:
+ break;
+ default:
+ throw new TopologySubmissionException(String.format("Volume `%s`:
Invalid Persistent"
+ + " Volume Claim type option for '%s'", volume.getKey(), key));
+ }
+ }
+ }
+
+ return volumes;
+ }
+
+ /**
+ * Collects parameters form the <code>CLI</code> and validates options for
<code>Empty Directory</code>s.
+ * @param config Contains the configuration options collected from the
<code>CLI</code>.
+ * @param isExecutor Flag used to collect CLI commands for the
<code>Executor</code> and <code>Manager</code>.
+ * @return A mapping between <code>Volumes</code> and their configuration
<code>key-value</code> pairs.
+ * Will return an empty list if there are no Volume Claim Templates to be
generated.
+ */
+ public static Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
+ getVolumeEmptyDir(Config config, boolean isExecutor) {
Review comment:
Changes affected.
##########
File path:
heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
##########
@@ -291,31 +309,202 @@ public static boolean
getPersistentVolumeClaimDisabled(Config config) {
volumes.put(volumeName, volume);
}
- /* Validate Claim and Storage Class names.
- [1] `claimNameNotOnDemand`: checks for a `claimName` which is not
`OnDemand`.
- [2] `storageClassName`: Check if it is the provided `option`.
- Conditions [1] OR [2] are True, then...
- [3] Check for a valid lowercase RFC-1123 pattern.
- */
- boolean claimNameNotOnDemand =
-
KubernetesConstants.VolumeClaimTemplateConfigKeys.claimName.equals(key)
- &&
!KubernetesConstants.LABEL_ON_DEMAND.equalsIgnoreCase(value);
- if ((claimNameNotOnDemand // [1]
- ||
-
KubernetesConstants.VolumeClaimTemplateConfigKeys.storageClassName.equals(key))
// [2]
- && !matcher.reset(value).matches()) { // [3]
- throw new TopologySubmissionException(
- String.format("Option `%s` value `%s` does not match lowercase
RFC-1123 pattern",
- key, value));
- }
-
volume.put(key, value);
}
} catch (IndexOutOfBoundsException | IllegalArgumentException e) {
- final String message = "Invalid Persistent Volume Claim CLI parameter
provided";
+ final String message = "Invalid Volume configuration option provided on
CLI";
LOG.log(Level.CONFIG, message);
throw new TopologySubmissionException(message);
}
+
+ // All Volumes must contain a path.
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
volume
+ : volumes.entrySet()) {
+ final String path =
volume.getValue().get(KubernetesConstants.VolumeConfigKeys.path);
+ if (path == null || path.isEmpty()) {
+ throw new TopologySubmissionException(String.format("Volume `%s`: All
Volumes require a"
+ + " 'path'.", volume.getKey()));
+ }
+ }
+
+ return volumes;
+ }
+
+ /**
+ * Collects parameters form the <code>CLI</code> and validates options for
<code>PVC</code>s.
+ * @param config Contains the configuration options collected from the
<code>CLI</code>.
+ * @param isExecutor Flag used to collect CLI commands for the
<code>Executor</code> and <code>Manager</code>.
+ * @return A mapping between <code>Volumes</code> and their configuration
<code>key-value</code> pairs.
+ * Will return an empty list if there are no Volume Claim Templates to be
generated.
+ */
+ public static Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
+ getVolumeClaimTemplates(Config config, boolean isExecutor) {
Review comment:
Changes affected.
##########
File path:
heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
##########
@@ -242,44 +253,51 @@ public static boolean getPodTemplateDisabled(Config
config) {
return getConfigItemsByPrefix(config, key);
}
- public static boolean getPersistentVolumeClaimDisabled(Config config) {
- final String disabled =
config.getStringValue(KUBERNETES_PERSISTENT_VOLUME_CLAIMS_CLI_DISABLED);
+ public static boolean getVolumesFromCLIDisabled(Config config) {
+ final String disabled =
config.getStringValue(KUBERNETES_VOLUME_FROM_CLI_DISABLED);
return "true".equalsIgnoreCase(disabled);
}
/**
* Collects parameters form the <code>CLI</code> and generates a mapping
between <code>Volumes</code>
* and their configuration <code>key-value</code> pairs.
* @param config Contains the configuration options collected from the
<code>CLI</code>.
- * @param isExecutor Flag used to collect CLI commands for the
<code>executor</code> and <code>manager</code>.
+ * @param prefix Configuration key to lookup for options.
+ * @param isExecutor Flag used to switch CLI commands for the
<code>Executor</code> and <code>Manager</code>.
* @return A mapping between <code>Volumes</code> and their configuration
<code>key-value</code> pairs.
* Will return an empty list if there are no Volume Claim Templates to be
generated.
*/
- public static Map<String,
Map<KubernetesConstants.VolumeClaimTemplateConfigKeys, String>>
- getVolumeClaimTemplates(Config config, boolean isExecutor) {
+ @VisibleForTesting
+ protected static Map<String, Map<KubernetesConstants.VolumeConfigKeys,
String>>
Review comment:
Changes affected.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]