joshfischer1108 commented on a change in pull request #3747:
URL: https://github.com/apache/incubator-heron/pull/3747#discussion_r766603834
##########
File path:
heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
##########
@@ -415,17 +415,21 @@ private V1StatefulSet createStatefulSet(Resource
containerResource, int numberOf
final String topologyName = getTopologyName();
final Config runtimeConfiguration = getRuntimeConfiguration();
- // Get and then create Persistent Volume Claims from the CLI.
- final Map<String, Map<KubernetesConstants.VolumeClaimTemplateConfigKeys,
String>> configsPVC =
+ final List<V1Volume> volumes = new LinkedList<>();
Review comment:
Only a question out of curiosity. Why did you choose the LinkedList
implementation?
##########
File path:
heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
##########
@@ -1101,80 +1104,212 @@ protected V1ConfigMap getConfigMap(String
configMapName) {
}
/**
- * Generates the <code>Volume</code> and <code>Volume Mounts</code> to be
placed in the <code>executor container</code>.
- * @param mapConfig Mapping of <code>Volumes</code> to
<code>key-value</code> configuration pairs.
- * @return A pair of configured lists of <code>V1Volume</code> and
<code>V1VolumeMount</code>.
+ * Generates the <code>Volume Mounts</code> to be placed in the
<code>Executor</code>
+ * and <code>Manager</code> from options on the CLI.
+ * @param volumeName Name of the <code>Volume</code>.
+ * @param configs Mapping of <code>Volume</code> option
<code>key-value</code> configuration pairs.
+ * @return A configured <code>V1VolumeMount</code>.
+ */
+ @VisibleForTesting
+ protected V1VolumeMount createVolumeMountsCLI(String volumeName,
+ Map<KubernetesConstants.VolumeConfigKeys, String> configs) {
+ final V1VolumeMount volumeMount = new V1VolumeMountBuilder()
+ .withName(volumeName)
+ .build();
+ for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> config :
configs.entrySet()) {
+ switch (config.getKey()) {
+ case path:
+ volumeMount.mountPath(config.getValue());
+ break;
+ case subPath:
+ volumeMount.subPath(config.getValue());
+ break;
+ case readOnly:
+ volumeMount.readOnly(Boolean.parseBoolean(config.getValue()));
+ break;
+ default:
+ break;
+ }
+ }
+
+ return volumeMount;
+ }
+
+ /**
+ * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for
<code>Persistent Volume Claims</code>s
+ * to be placed in the <code>Executor</code> and <code>Manager</code> from
options on the CLI.
+ * @param mapConfig Mapping of <code>Volume</code> option
<code>key-value</code> configuration pairs.
+ * @param volumes A list of <code>Volume</code> to append to.
+ * @param volumeMounts A list of <code>Volume Mounts</code> to append to.
*/
@VisibleForTesting
- protected Pair<List<V1Volume>, List<V1VolumeMount>>
createPersistentVolumeClaimVolumesAndMounts(
- final Map<String, Map<KubernetesConstants.VolumeClaimTemplateConfigKeys,
String>> mapConfig) {
- List<V1Volume> volumeList = new LinkedList<>();
- List<V1VolumeMount> mountList = new LinkedList<>();
- for (Map.Entry<String,
Map<KubernetesConstants.VolumeClaimTemplateConfigKeys, String>> configs
+ protected void createVolumeAndMountsPersistentVolumeClaimCLI(
+ Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> mapConfig,
+ List<V1Volume> volumes, List<V1VolumeMount> volumeMounts) {
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
configs
: mapConfig.entrySet()) {
final String volumeName = configs.getKey();
- final String path = configs.getValue()
- .get(KubernetesConstants.VolumeClaimTemplateConfigKeys.path);
- final String subPath = configs.getValue()
- .get(KubernetesConstants.VolumeClaimTemplateConfigKeys.subPath);
-
- if (path == null || path.isEmpty()) {
- throw new TopologySubmissionException(
- String.format("A mount path is required and missing from '%s'",
volumeName));
- }
// Do not create Volumes for `OnDemand`.
final String claimName = configs.getValue()
- .get(KubernetesConstants.VolumeClaimTemplateConfigKeys.claimName);
+ .get(KubernetesConstants.VolumeConfigKeys.claimName);
if (claimName != null &&
!KubernetesConstants.LABEL_ON_DEMAND.equalsIgnoreCase(claimName)) {
- final V1Volume volume = new V1VolumeBuilder()
- .withName(volumeName)
- .withNewPersistentVolumeClaim()
- .withClaimName(claimName)
- .endPersistentVolumeClaim()
- .build();
- volumeList.add(volume);
+ volumes.add(
+ new V1VolumeBuilder()
+ .withName(volumeName)
+ .withNewPersistentVolumeClaim()
+ .withClaimName(claimName)
+ .endPersistentVolumeClaim()
+ .build()
+ );
}
+ volumeMounts.add(createVolumeMountsCLI(volumeName, configs.getValue()));
+ }
+ }
- final V1VolumeMountBuilder volumeMount = new V1VolumeMountBuilder()
+ /**
+ * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for
<code>emptyDir</code>s to be
+ * placed in the <code>Executor</code> and <code>Manager</code> from options
on the CLI.
+ * @param mapOfOpts Mapping of <code>Volume</code> option
<code>key-value</code> configuration pairs.
+ * @param volumes A list of <code>Volume</code> to append to.
+ * @param volumeMounts A list of <code>Volume Mounts</code> to append to.
+ */
+ @VisibleForTesting
+ protected void createVolumeAndMountsEmptyDirCLI(
+ Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> mapOfOpts,
Review comment:
Would it be worthwhile to mark this parameters passed in the function as
`final`?
##########
File path:
heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
##########
@@ -1101,80 +1104,212 @@ protected V1ConfigMap getConfigMap(String
configMapName) {
}
/**
- * Generates the <code>Volume</code> and <code>Volume Mounts</code> to be
placed in the <code>executor container</code>.
- * @param mapConfig Mapping of <code>Volumes</code> to
<code>key-value</code> configuration pairs.
- * @return A pair of configured lists of <code>V1Volume</code> and
<code>V1VolumeMount</code>.
+ * Generates the <code>Volume Mounts</code> to be placed in the
<code>Executor</code>
+ * and <code>Manager</code> from options on the CLI.
+ * @param volumeName Name of the <code>Volume</code>.
+ * @param configs Mapping of <code>Volume</code> option
<code>key-value</code> configuration pairs.
+ * @return A configured <code>V1VolumeMount</code>.
+ */
+ @VisibleForTesting
+ protected V1VolumeMount createVolumeMountsCLI(String volumeName,
+ Map<KubernetesConstants.VolumeConfigKeys, String> configs) {
+ final V1VolumeMount volumeMount = new V1VolumeMountBuilder()
+ .withName(volumeName)
+ .build();
+ for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> config :
configs.entrySet()) {
+ switch (config.getKey()) {
+ case path:
+ volumeMount.mountPath(config.getValue());
+ break;
+ case subPath:
+ volumeMount.subPath(config.getValue());
+ break;
+ case readOnly:
+ volumeMount.readOnly(Boolean.parseBoolean(config.getValue()));
+ break;
+ default:
+ break;
+ }
+ }
+
+ return volumeMount;
+ }
+
+ /**
+ * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for
<code>Persistent Volume Claims</code>s
+ * to be placed in the <code>Executor</code> and <code>Manager</code> from
options on the CLI.
+ * @param mapConfig Mapping of <code>Volume</code> option
<code>key-value</code> configuration pairs.
+ * @param volumes A list of <code>Volume</code> to append to.
+ * @param volumeMounts A list of <code>Volume Mounts</code> to append to.
*/
@VisibleForTesting
- protected Pair<List<V1Volume>, List<V1VolumeMount>>
createPersistentVolumeClaimVolumesAndMounts(
- final Map<String, Map<KubernetesConstants.VolumeClaimTemplateConfigKeys,
String>> mapConfig) {
- List<V1Volume> volumeList = new LinkedList<>();
- List<V1VolumeMount> mountList = new LinkedList<>();
- for (Map.Entry<String,
Map<KubernetesConstants.VolumeClaimTemplateConfigKeys, String>> configs
+ protected void createVolumeAndMountsPersistentVolumeClaimCLI(
+ Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> mapConfig,
+ List<V1Volume> volumes, List<V1VolumeMount> volumeMounts) {
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
configs
: mapConfig.entrySet()) {
final String volumeName = configs.getKey();
- final String path = configs.getValue()
- .get(KubernetesConstants.VolumeClaimTemplateConfigKeys.path);
- final String subPath = configs.getValue()
- .get(KubernetesConstants.VolumeClaimTemplateConfigKeys.subPath);
-
- if (path == null || path.isEmpty()) {
- throw new TopologySubmissionException(
- String.format("A mount path is required and missing from '%s'",
volumeName));
- }
// Do not create Volumes for `OnDemand`.
final String claimName = configs.getValue()
- .get(KubernetesConstants.VolumeClaimTemplateConfigKeys.claimName);
+ .get(KubernetesConstants.VolumeConfigKeys.claimName);
if (claimName != null &&
!KubernetesConstants.LABEL_ON_DEMAND.equalsIgnoreCase(claimName)) {
- final V1Volume volume = new V1VolumeBuilder()
- .withName(volumeName)
- .withNewPersistentVolumeClaim()
- .withClaimName(claimName)
- .endPersistentVolumeClaim()
- .build();
- volumeList.add(volume);
+ volumes.add(
+ new V1VolumeBuilder()
+ .withName(volumeName)
+ .withNewPersistentVolumeClaim()
+ .withClaimName(claimName)
+ .endPersistentVolumeClaim()
+ .build()
+ );
}
+ volumeMounts.add(createVolumeMountsCLI(volumeName, configs.getValue()));
+ }
+ }
- final V1VolumeMountBuilder volumeMount = new V1VolumeMountBuilder()
+ /**
+ * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for
<code>emptyDir</code>s to be
+ * placed in the <code>Executor</code> and <code>Manager</code> from options
on the CLI.
+ * @param mapOfOpts Mapping of <code>Volume</code> option
<code>key-value</code> configuration pairs.
+ * @param volumes A list of <code>Volume</code> to append to.
+ * @param volumeMounts A list of <code>Volume Mounts</code> to append to.
+ */
+ @VisibleForTesting
+ protected void createVolumeAndMountsEmptyDirCLI(
+ Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> mapOfOpts,
+ List<V1Volume> volumes, List<V1VolumeMount> volumeMounts) {
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
configs
+ : mapOfOpts.entrySet()) {
+ final String volumeName = configs.getKey();
+ final V1Volume volume = new V1VolumeBuilder()
.withName(volumeName)
- .withMountPath(path);
- if (subPath != null && !subPath.isEmpty()) {
- volumeMount.withSubPath(subPath);
+ .withNewEmptyDir()
+ .endEmptyDir()
+ .build();
+
+ for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> config
+ : configs.getValue().entrySet()) {
+ switch(config.getKey()) {
+ case medium:
+ volume.getEmptyDir().medium(config.getValue());
+ break;
+ case sizeLimit:
+ volume.getEmptyDir().sizeLimit(new Quantity(config.getValue()));
+ break;
+ default:
+ break;
+ }
}
- mountList.add(volumeMount.build());
+ volumes.add(volume);
+ volumeMounts.add(createVolumeMountsCLI(volumeName, configs.getValue()));
}
- return new Pair<>(volumeList, mountList);
}
/**
- * Makes a call to generate <code>Volumes</code> and <code>Volume
Mounts</code> and then inserts them.
+ * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for
<code>Host Path</code>s to be
+ * placed in the <code>Executor</code> and <code>Manager</code> from options
on the CLI.
+ * @param mapOfOpts Mapping of <code>Volume</code> option
<code>key-value</code> configuration pairs.
+ * @param volumes A list of <code>Volume</code> to append to.
+ * @param volumeMounts A list of <code>Volume Mounts</code> to append to.
+ */
+ @VisibleForTesting
+ protected void createVolumeAndMountsHostPathCLI(
+ Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> mapOfOpts,
Review comment:
Would it be worthwhile to mark this parameters passed in the function as
`final`?
##########
File path:
heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
##########
@@ -1101,80 +1104,212 @@ protected V1ConfigMap getConfigMap(String
configMapName) {
}
/**
- * Generates the <code>Volume</code> and <code>Volume Mounts</code> to be
placed in the <code>executor container</code>.
- * @param mapConfig Mapping of <code>Volumes</code> to
<code>key-value</code> configuration pairs.
- * @return A pair of configured lists of <code>V1Volume</code> and
<code>V1VolumeMount</code>.
+ * Generates the <code>Volume Mounts</code> to be placed in the
<code>Executor</code>
+ * and <code>Manager</code> from options on the CLI.
+ * @param volumeName Name of the <code>Volume</code>.
+ * @param configs Mapping of <code>Volume</code> option
<code>key-value</code> configuration pairs.
+ * @return A configured <code>V1VolumeMount</code>.
+ */
+ @VisibleForTesting
+ protected V1VolumeMount createVolumeMountsCLI(String volumeName,
Review comment:
Would it be worthwhile to mark this parameters passed in the function as
`final`?
##########
File path:
heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
##########
@@ -519,11 +524,12 @@ private V1StatefulSet createStatefulSet(Resource
containerResource, int numberOf
* @param resource Passed down to configure the resource limits.
* @param numberOfInstances Passed down to configure the ports.
* @param isExecutor Flag used to configure components specific to
<code>Executor</code> and <code>Manager</code>.
- * @param configPVC <code>Persistent Volume Claim</code> configurations
options.
+ * @param volumes <code>Volumes</code> generated from configurations options.
+ * @param volumeMounts <code>Volume Mounts</code> generated from
configurations options.
*/
private void configurePodSpec(final V1PodTemplateSpec podTemplateSpec,
Resource resource,
Review comment:
Would it be worthwhile to mark this parameters passed in the function as
`final`?
##########
File path:
heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
##########
@@ -242,44 +253,51 @@ public static boolean getPodTemplateDisabled(Config
config) {
return getConfigItemsByPrefix(config, key);
}
- public static boolean getPersistentVolumeClaimDisabled(Config config) {
- final String disabled =
config.getStringValue(KUBERNETES_PERSISTENT_VOLUME_CLAIMS_CLI_DISABLED);
+ public static boolean getVolumesFromCLIDisabled(Config config) {
+ final String disabled =
config.getStringValue(KUBERNETES_VOLUME_FROM_CLI_DISABLED);
return "true".equalsIgnoreCase(disabled);
}
/**
* Collects parameters form the <code>CLI</code> and generates a mapping
between <code>Volumes</code>
* and their configuration <code>key-value</code> pairs.
* @param config Contains the configuration options collected from the
<code>CLI</code>.
- * @param isExecutor Flag used to collect CLI commands for the
<code>executor</code> and <code>manager</code>.
+ * @param prefix Configuration key to lookup for options.
+ * @param isExecutor Flag used to switch CLI commands for the
<code>Executor</code> and <code>Manager</code>.
* @return A mapping between <code>Volumes</code> and their configuration
<code>key-value</code> pairs.
* Will return an empty list if there are no Volume Claim Templates to be
generated.
*/
- public static Map<String,
Map<KubernetesConstants.VolumeClaimTemplateConfigKeys, String>>
- getVolumeClaimTemplates(Config config, boolean isExecutor) {
+ @VisibleForTesting
+ protected static Map<String, Map<KubernetesConstants.VolumeConfigKeys,
String>>
Review comment:
Would it be worthwhile to mark this parameters passed in the function as
`final`?
##########
File path:
heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
##########
@@ -1101,80 +1104,212 @@ protected V1ConfigMap getConfigMap(String
configMapName) {
}
/**
- * Generates the <code>Volume</code> and <code>Volume Mounts</code> to be
placed in the <code>executor container</code>.
- * @param mapConfig Mapping of <code>Volumes</code> to
<code>key-value</code> configuration pairs.
- * @return A pair of configured lists of <code>V1Volume</code> and
<code>V1VolumeMount</code>.
+ * Generates the <code>Volume Mounts</code> to be placed in the
<code>Executor</code>
+ * and <code>Manager</code> from options on the CLI.
+ * @param volumeName Name of the <code>Volume</code>.
+ * @param configs Mapping of <code>Volume</code> option
<code>key-value</code> configuration pairs.
+ * @return A configured <code>V1VolumeMount</code>.
+ */
+ @VisibleForTesting
+ protected V1VolumeMount createVolumeMountsCLI(String volumeName,
+ Map<KubernetesConstants.VolumeConfigKeys, String> configs) {
+ final V1VolumeMount volumeMount = new V1VolumeMountBuilder()
+ .withName(volumeName)
+ .build();
+ for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> config :
configs.entrySet()) {
+ switch (config.getKey()) {
+ case path:
+ volumeMount.mountPath(config.getValue());
+ break;
+ case subPath:
+ volumeMount.subPath(config.getValue());
+ break;
+ case readOnly:
+ volumeMount.readOnly(Boolean.parseBoolean(config.getValue()));
+ break;
+ default:
+ break;
+ }
+ }
+
+ return volumeMount;
+ }
+
+ /**
+ * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for
<code>Persistent Volume Claims</code>s
+ * to be placed in the <code>Executor</code> and <code>Manager</code> from
options on the CLI.
+ * @param mapConfig Mapping of <code>Volume</code> option
<code>key-value</code> configuration pairs.
+ * @param volumes A list of <code>Volume</code> to append to.
+ * @param volumeMounts A list of <code>Volume Mounts</code> to append to.
*/
@VisibleForTesting
- protected Pair<List<V1Volume>, List<V1VolumeMount>>
createPersistentVolumeClaimVolumesAndMounts(
- final Map<String, Map<KubernetesConstants.VolumeClaimTemplateConfigKeys,
String>> mapConfig) {
- List<V1Volume> volumeList = new LinkedList<>();
- List<V1VolumeMount> mountList = new LinkedList<>();
- for (Map.Entry<String,
Map<KubernetesConstants.VolumeClaimTemplateConfigKeys, String>> configs
+ protected void createVolumeAndMountsPersistentVolumeClaimCLI(
+ Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> mapConfig,
+ List<V1Volume> volumes, List<V1VolumeMount> volumeMounts) {
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
configs
: mapConfig.entrySet()) {
final String volumeName = configs.getKey();
- final String path = configs.getValue()
- .get(KubernetesConstants.VolumeClaimTemplateConfigKeys.path);
- final String subPath = configs.getValue()
- .get(KubernetesConstants.VolumeClaimTemplateConfigKeys.subPath);
-
- if (path == null || path.isEmpty()) {
- throw new TopologySubmissionException(
- String.format("A mount path is required and missing from '%s'",
volumeName));
- }
// Do not create Volumes for `OnDemand`.
final String claimName = configs.getValue()
- .get(KubernetesConstants.VolumeClaimTemplateConfigKeys.claimName);
+ .get(KubernetesConstants.VolumeConfigKeys.claimName);
if (claimName != null &&
!KubernetesConstants.LABEL_ON_DEMAND.equalsIgnoreCase(claimName)) {
- final V1Volume volume = new V1VolumeBuilder()
- .withName(volumeName)
- .withNewPersistentVolumeClaim()
- .withClaimName(claimName)
- .endPersistentVolumeClaim()
- .build();
- volumeList.add(volume);
+ volumes.add(
+ new V1VolumeBuilder()
+ .withName(volumeName)
+ .withNewPersistentVolumeClaim()
+ .withClaimName(claimName)
+ .endPersistentVolumeClaim()
+ .build()
+ );
}
+ volumeMounts.add(createVolumeMountsCLI(volumeName, configs.getValue()));
+ }
+ }
- final V1VolumeMountBuilder volumeMount = new V1VolumeMountBuilder()
+ /**
+ * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for
<code>emptyDir</code>s to be
+ * placed in the <code>Executor</code> and <code>Manager</code> from options
on the CLI.
+ * @param mapOfOpts Mapping of <code>Volume</code> option
<code>key-value</code> configuration pairs.
+ * @param volumes A list of <code>Volume</code> to append to.
+ * @param volumeMounts A list of <code>Volume Mounts</code> to append to.
+ */
+ @VisibleForTesting
+ protected void createVolumeAndMountsEmptyDirCLI(
+ Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> mapOfOpts,
+ List<V1Volume> volumes, List<V1VolumeMount> volumeMounts) {
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
configs
+ : mapOfOpts.entrySet()) {
+ final String volumeName = configs.getKey();
+ final V1Volume volume = new V1VolumeBuilder()
.withName(volumeName)
- .withMountPath(path);
- if (subPath != null && !subPath.isEmpty()) {
- volumeMount.withSubPath(subPath);
+ .withNewEmptyDir()
+ .endEmptyDir()
+ .build();
+
+ for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> config
+ : configs.getValue().entrySet()) {
+ switch(config.getKey()) {
+ case medium:
+ volume.getEmptyDir().medium(config.getValue());
+ break;
+ case sizeLimit:
+ volume.getEmptyDir().sizeLimit(new Quantity(config.getValue()));
+ break;
+ default:
+ break;
+ }
}
- mountList.add(volumeMount.build());
+ volumes.add(volume);
+ volumeMounts.add(createVolumeMountsCLI(volumeName, configs.getValue()));
}
- return new Pair<>(volumeList, mountList);
}
/**
- * Makes a call to generate <code>Volumes</code> and <code>Volume
Mounts</code> and then inserts them.
+ * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for
<code>Host Path</code>s to be
+ * placed in the <code>Executor</code> and <code>Manager</code> from options
on the CLI.
+ * @param mapOfOpts Mapping of <code>Volume</code> option
<code>key-value</code> configuration pairs.
+ * @param volumes A list of <code>Volume</code> to append to.
+ * @param volumeMounts A list of <code>Volume Mounts</code> to append to.
+ */
+ @VisibleForTesting
+ protected void createVolumeAndMountsHostPathCLI(
+ Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> mapOfOpts,
+ List<V1Volume> volumes, List<V1VolumeMount> volumeMounts) {
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
configs
+ : mapOfOpts.entrySet()) {
+ final String volumeName = configs.getKey();
+ final V1Volume volume = new V1VolumeBuilder()
+ .withName(volumeName)
+ .withNewHostPath()
+ .endHostPath()
+ .build();
+
+ for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> config
+ : configs.getValue().entrySet()) {
+ switch(config.getKey()) {
+ case type:
+ volume.getHostPath().setType(config.getValue());
+ break;
+ case pathOnHost:
+ volume.getHostPath().setPath(config.getValue());
+ break;
+ default:
+ break;
+ }
+ }
+ volumes.add(volume);
+ volumeMounts.add(createVolumeMountsCLI(volumeName, configs.getValue()));
+ }
+ }
+
+ /**
+ * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for
<code>NFS</code>s to be
+ * placed in the <code>Executor</code> and <code>Manager</code> from options
on the CLI.
+ * @param mapOfOpts Mapping of <code>Volume</code> option
<code>key-value</code> configuration pairs.
+ * @param volumes A list of <code>Volume</code> to append to.
+ * @param volumeMounts A list of <code>Volume Mounts</code> to append to.
+ */
+ @VisibleForTesting
+ protected void createVolumeAndMountsNFSCLI(
+ Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> mapOfOpts,
Review comment:
Would it be worthwhile to mark this parameters passed in the function as
`final`?
##########
File path:
heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
##########
@@ -291,31 +309,202 @@ public static boolean
getPersistentVolumeClaimDisabled(Config config) {
volumes.put(volumeName, volume);
}
- /* Validate Claim and Storage Class names.
- [1] `claimNameNotOnDemand`: checks for a `claimName` which is not
`OnDemand`.
- [2] `storageClassName`: Check if it is the provided `option`.
- Conditions [1] OR [2] are True, then...
- [3] Check for a valid lowercase RFC-1123 pattern.
- */
- boolean claimNameNotOnDemand =
-
KubernetesConstants.VolumeClaimTemplateConfigKeys.claimName.equals(key)
- &&
!KubernetesConstants.LABEL_ON_DEMAND.equalsIgnoreCase(value);
- if ((claimNameNotOnDemand // [1]
- ||
-
KubernetesConstants.VolumeClaimTemplateConfigKeys.storageClassName.equals(key))
// [2]
- && !matcher.reset(value).matches()) { // [3]
- throw new TopologySubmissionException(
- String.format("Option `%s` value `%s` does not match lowercase
RFC-1123 pattern",
- key, value));
- }
-
volume.put(key, value);
}
} catch (IndexOutOfBoundsException | IllegalArgumentException e) {
- final String message = "Invalid Persistent Volume Claim CLI parameter
provided";
+ final String message = "Invalid Volume configuration option provided on
CLI";
LOG.log(Level.CONFIG, message);
throw new TopologySubmissionException(message);
}
+
+ // All Volumes must contain a path.
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
volume
+ : volumes.entrySet()) {
+ final String path =
volume.getValue().get(KubernetesConstants.VolumeConfigKeys.path);
+ if (path == null || path.isEmpty()) {
+ throw new TopologySubmissionException(String.format("Volume `%s`: All
Volumes require a"
+ + " 'path'.", volume.getKey()));
+ }
+ }
+
+ return volumes;
+ }
+
+ /**
+ * Collects parameters form the <code>CLI</code> and validates options for
<code>PVC</code>s.
+ * @param config Contains the configuration options collected from the
<code>CLI</code>.
+ * @param isExecutor Flag used to collect CLI commands for the
<code>Executor</code> and <code>Manager</code>.
+ * @return A mapping between <code>Volumes</code> and their configuration
<code>key-value</code> pairs.
+ * Will return an empty list if there are no Volume Claim Templates to be
generated.
+ */
+ public static Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
+ getVolumeClaimTemplates(Config config, boolean isExecutor) {
+ final Matcher matcher =
KubernetesConstants.VALID_LOWERCASE_RFC_1123_REGEX.matcher("");
+
+ final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
volumes =
+ getVolumeConfigs(config,
KubernetesContext.KUBERNETES_VOLUME_CLAIM_PREFIX, isExecutor);
+
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
volume
+ : volumes.entrySet()) {
+
+ // Claim name is required.
+ if
(!volume.getValue().containsKey(KubernetesConstants.VolumeConfigKeys.claimName))
{
+ throw new TopologySubmissionException(String.format("Volume `%s`:
Persistent Volume"
+ + " Claims require a `claimName`.", volume.getKey()));
+ }
+
+ for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> volumeConfig
+ : volume.getValue().entrySet()) {
+ final KubernetesConstants.VolumeConfigKeys key = volumeConfig.getKey();
+ final String value = volumeConfig.getValue();
+
+ switch (key) {
+ case claimName:
+ // Claim names which are not OnDemand should be lowercase RFC-1123.
+ if (!matcher.reset(value).matches()
+ &&
!KubernetesConstants.LABEL_ON_DEMAND.equalsIgnoreCase(value)) {
+ throw new TopologySubmissionException(String.format("Volume
`%s`: `claimName` does"
+ + " not match lowercase RFC-1123 pattern", volume.getKey()));
+ }
+ break;
+ case storageClassName:
+ if (!matcher.reset(value).matches()) {
+ throw new TopologySubmissionException(String.format("Volume
`%s`: `storageClassName`"
+ + " does not match lowercase RFC-1123 pattern",
volume.getKey()));
+ }
+ break;
+ case sizeLimit: case accessModes: case volumeMode: case readOnly:
case path: case subPath:
+ break;
+ default:
+ throw new TopologySubmissionException(String.format("Volume `%s`:
Invalid Persistent"
+ + " Volume Claim type option for '%s'", volume.getKey(), key));
+ }
+ }
+ }
+
+ return volumes;
+ }
+
+ /**
+ * Collects parameters form the <code>CLI</code> and validates options for
<code>Empty Directory</code>s.
+ * @param config Contains the configuration options collected from the
<code>CLI</code>.
+ * @param isExecutor Flag used to collect CLI commands for the
<code>Executor</code> and <code>Manager</code>.
+ * @return A mapping between <code>Volumes</code> and their configuration
<code>key-value</code> pairs.
+ * Will return an empty list if there are no Volume Claim Templates to be
generated.
+ */
+ public static Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
+ getVolumeEmptyDir(Config config, boolean isExecutor) {
+ final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
volumes =
+ getVolumeConfigs(config,
KubernetesContext.KUBERNETES_VOLUME_EMPTYDIR_PREFIX, isExecutor);
+
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
volume
+ : volumes.entrySet()) {
+ final String medium =
volume.getValue().get(KubernetesConstants.VolumeConfigKeys.medium);
+
+ if (medium != null && !medium.isEmpty() && !"Memory".equals(medium)) {
+ throw new TopologySubmissionException(String.format("Volume `%s`:
Empty Directory"
+ + " 'medium' must be 'Memory' or empty.", volume.getKey()));
+ }
+ for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> volumeConfig
+ : volume.getValue().entrySet()) {
+ final KubernetesConstants.VolumeConfigKeys key = volumeConfig.getKey();
+
+ switch (key) {
+ case sizeLimit: case medium: case readOnly: case path: case subPath:
+ break;
+ default:
+ throw new TopologySubmissionException(String.format("Volume `%s`:
Invalid Empty"
+ + " Directory type option for '%s'", volume.getKey(), key));
+ }
+ }
+ }
+
+ return volumes;
+ }
+
+ /**
+ * Collects parameters form the <code>CLI</code> and validates options for
<code>Host Path</code>s.
+ * @param config Contains the configuration options collected from the
<code>CLI</code>.
+ * @param isExecutor Flag used to collect CLI commands for the
<code>Executor</code> and <code>Manager</code>.
+ * @return A mapping between <code>Volumes</code> and their configuration
<code>key-value</code> pairs.
+ * Will return an empty list if there are no Volume Claim Templates to be
generated.
+ */
+ public static Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
+ getVolumeHostPath(Config config, boolean isExecutor) {
+ final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
volumes =
+ getVolumeConfigs(config,
KubernetesContext.KUBERNETES_VOLUME_HOSTPATH_PREFIX, isExecutor);
+
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
volume
+ : volumes.entrySet()) {
+ final String type =
volume.getValue().get(KubernetesConstants.VolumeConfigKeys.type);
+ if (type != null &&
!KubernetesConstants.VALID_VOLUME_HOSTPATH_TYPES.contains(type)) {
+ throw new TopologySubmissionException(String.format("Volume `%s`: Host
Path"
+ + " 'type' of '%s' is invalid.", volume.getKey(), type));
+ }
+ final String hostOnPath =
+
volume.getValue().get(KubernetesConstants.VolumeConfigKeys.pathOnHost);
+ if (hostOnPath == null || hostOnPath.isEmpty()) {
+ throw new TopologySubmissionException(String.format("Volume `%s`: Host
Path requires a"
+ + " path on the host.", volume.getKey()));
+ }
+
+ for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> volumeConfig
+ : volume.getValue().entrySet()) {
+ final KubernetesConstants.VolumeConfigKeys key = volumeConfig.getKey();
+
+ switch (key) {
+ case type: case pathOnHost: case readOnly: case path: case subPath:
+ break;
+ default:
+ throw new TopologySubmissionException(String.format("Volume `%s`:
Invalid Host Path"
+ + " option for '%s'", volume.getKey(), key));
+ }
+ }
+ }
+
+ return volumes;
+ }
+
+ /**
+ * Collects parameters form the <code>CLI</code> and validates options for
<code>NFS</code>s.
+ * @param config Contains the configuration options collected from the
<code>CLI</code>.
+ * @param isExecutor Flag used to collect CLI commands for the
<code>Executor</code> and <code>Manager</code>.
+ * @return A mapping between <code>Volumes</code> and their configuration
<code>key-value</code> pairs.
+ * Will return an empty list if there are no Volume Claim Templates to be
generated.
+ */
+ public static Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
Review comment:
Would it be worthwhile to mark this parameters passed in the function as
`final`?
##########
File path:
heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
##########
@@ -291,31 +309,202 @@ public static boolean
getPersistentVolumeClaimDisabled(Config config) {
volumes.put(volumeName, volume);
}
- /* Validate Claim and Storage Class names.
- [1] `claimNameNotOnDemand`: checks for a `claimName` which is not
`OnDemand`.
- [2] `storageClassName`: Check if it is the provided `option`.
- Conditions [1] OR [2] are True, then...
- [3] Check for a valid lowercase RFC-1123 pattern.
- */
- boolean claimNameNotOnDemand =
-
KubernetesConstants.VolumeClaimTemplateConfigKeys.claimName.equals(key)
- &&
!KubernetesConstants.LABEL_ON_DEMAND.equalsIgnoreCase(value);
- if ((claimNameNotOnDemand // [1]
- ||
-
KubernetesConstants.VolumeClaimTemplateConfigKeys.storageClassName.equals(key))
// [2]
- && !matcher.reset(value).matches()) { // [3]
- throw new TopologySubmissionException(
- String.format("Option `%s` value `%s` does not match lowercase
RFC-1123 pattern",
- key, value));
- }
-
volume.put(key, value);
}
} catch (IndexOutOfBoundsException | IllegalArgumentException e) {
- final String message = "Invalid Persistent Volume Claim CLI parameter
provided";
+ final String message = "Invalid Volume configuration option provided on
CLI";
LOG.log(Level.CONFIG, message);
throw new TopologySubmissionException(message);
}
+
+ // All Volumes must contain a path.
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
volume
+ : volumes.entrySet()) {
+ final String path =
volume.getValue().get(KubernetesConstants.VolumeConfigKeys.path);
+ if (path == null || path.isEmpty()) {
+ throw new TopologySubmissionException(String.format("Volume `%s`: All
Volumes require a"
+ + " 'path'.", volume.getKey()));
+ }
+ }
+
+ return volumes;
+ }
+
+ /**
+ * Collects parameters form the <code>CLI</code> and validates options for
<code>PVC</code>s.
+ * @param config Contains the configuration options collected from the
<code>CLI</code>.
+ * @param isExecutor Flag used to collect CLI commands for the
<code>Executor</code> and <code>Manager</code>.
+ * @return A mapping between <code>Volumes</code> and their configuration
<code>key-value</code> pairs.
+ * Will return an empty list if there are no Volume Claim Templates to be
generated.
+ */
+ public static Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
+ getVolumeClaimTemplates(Config config, boolean isExecutor) {
+ final Matcher matcher =
KubernetesConstants.VALID_LOWERCASE_RFC_1123_REGEX.matcher("");
+
+ final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
volumes =
+ getVolumeConfigs(config,
KubernetesContext.KUBERNETES_VOLUME_CLAIM_PREFIX, isExecutor);
+
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
volume
+ : volumes.entrySet()) {
+
+ // Claim name is required.
+ if
(!volume.getValue().containsKey(KubernetesConstants.VolumeConfigKeys.claimName))
{
+ throw new TopologySubmissionException(String.format("Volume `%s`:
Persistent Volume"
+ + " Claims require a `claimName`.", volume.getKey()));
+ }
+
+ for (Map.Entry<KubernetesConstants.VolumeConfigKeys, String> volumeConfig
+ : volume.getValue().entrySet()) {
+ final KubernetesConstants.VolumeConfigKeys key = volumeConfig.getKey();
+ final String value = volumeConfig.getValue();
+
+ switch (key) {
+ case claimName:
+ // Claim names which are not OnDemand should be lowercase RFC-1123.
+ if (!matcher.reset(value).matches()
+ &&
!KubernetesConstants.LABEL_ON_DEMAND.equalsIgnoreCase(value)) {
+ throw new TopologySubmissionException(String.format("Volume
`%s`: `claimName` does"
+ + " not match lowercase RFC-1123 pattern", volume.getKey()));
+ }
+ break;
+ case storageClassName:
+ if (!matcher.reset(value).matches()) {
+ throw new TopologySubmissionException(String.format("Volume
`%s`: `storageClassName`"
+ + " does not match lowercase RFC-1123 pattern",
volume.getKey()));
+ }
+ break;
+ case sizeLimit: case accessModes: case volumeMode: case readOnly:
case path: case subPath:
+ break;
+ default:
+ throw new TopologySubmissionException(String.format("Volume `%s`:
Invalid Persistent"
+ + " Volume Claim type option for '%s'", volume.getKey(), key));
+ }
+ }
+ }
+
+ return volumes;
+ }
+
+ /**
+ * Collects parameters form the <code>CLI</code> and validates options for
<code>Empty Directory</code>s.
+ * @param config Contains the configuration options collected from the
<code>CLI</code>.
+ * @param isExecutor Flag used to collect CLI commands for the
<code>Executor</code> and <code>Manager</code>.
+ * @return A mapping between <code>Volumes</code> and their configuration
<code>key-value</code> pairs.
+ * Will return an empty list if there are no Volume Claim Templates to be
generated.
+ */
+ public static Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
+ getVolumeEmptyDir(Config config, boolean isExecutor) {
Review comment:
Would it be worthwhile to mark this parameters passed in the function as
`final`?
##########
File path:
heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
##########
@@ -291,31 +309,202 @@ public static boolean
getPersistentVolumeClaimDisabled(Config config) {
volumes.put(volumeName, volume);
}
- /* Validate Claim and Storage Class names.
- [1] `claimNameNotOnDemand`: checks for a `claimName` which is not
`OnDemand`.
- [2] `storageClassName`: Check if it is the provided `option`.
- Conditions [1] OR [2] are True, then...
- [3] Check for a valid lowercase RFC-1123 pattern.
- */
- boolean claimNameNotOnDemand =
-
KubernetesConstants.VolumeClaimTemplateConfigKeys.claimName.equals(key)
- &&
!KubernetesConstants.LABEL_ON_DEMAND.equalsIgnoreCase(value);
- if ((claimNameNotOnDemand // [1]
- ||
-
KubernetesConstants.VolumeClaimTemplateConfigKeys.storageClassName.equals(key))
// [2]
- && !matcher.reset(value).matches()) { // [3]
- throw new TopologySubmissionException(
- String.format("Option `%s` value `%s` does not match lowercase
RFC-1123 pattern",
- key, value));
- }
-
volume.put(key, value);
}
} catch (IndexOutOfBoundsException | IllegalArgumentException e) {
- final String message = "Invalid Persistent Volume Claim CLI parameter
provided";
+ final String message = "Invalid Volume configuration option provided on
CLI";
LOG.log(Level.CONFIG, message);
throw new TopologySubmissionException(message);
}
+
+ // All Volumes must contain a path.
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
volume
+ : volumes.entrySet()) {
+ final String path =
volume.getValue().get(KubernetesConstants.VolumeConfigKeys.path);
+ if (path == null || path.isEmpty()) {
+ throw new TopologySubmissionException(String.format("Volume `%s`: All
Volumes require a"
+ + " 'path'.", volume.getKey()));
+ }
+ }
+
+ return volumes;
+ }
+
+ /**
+ * Collects parameters form the <code>CLI</code> and validates options for
<code>PVC</code>s.
+ * @param config Contains the configuration options collected from the
<code>CLI</code>.
+ * @param isExecutor Flag used to collect CLI commands for the
<code>Executor</code> and <code>Manager</code>.
+ * @return A mapping between <code>Volumes</code> and their configuration
<code>key-value</code> pairs.
+ * Will return an empty list if there are no Volume Claim Templates to be
generated.
+ */
+ public static Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
+ getVolumeClaimTemplates(Config config, boolean isExecutor) {
Review comment:
Would it be worthwhile to mark this parameters passed in the function as
`final`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]