Github user rvesse commented on a diff in the pull request: https://github.com/apache/spark/pull/22959#discussion_r231838596 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala --- @@ -112,125 +72,139 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf]( def getOption(key: String): Option[String] = sparkConf.getOption(key) } +private[spark] class KubernetesDriverConf( + sparkConf: SparkConf, + val appId: String, + val mainAppResource: MainAppResource, + val mainClass: String, + val appArgs: Array[String], + val pyFiles: Seq[String]) + extends KubernetesConf(sparkConf) { + + override val resourceNamePrefix: String = { + val custom = if (Utils.isTesting) get(KUBERNETES_DRIVER_POD_NAME_PREFIX) else None + custom.getOrElse(KubernetesConf.getResourceNamePrefix(appName)) + } + + override def labels: Map[String, String] = { + val presetLabels = Map( + SPARK_APP_ID_LABEL -> appId, + SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) + val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs( + sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX) + + presetLabels.keys.foreach { key => + require( + !driverCustomLabels.contains(key), + s"Label with key $key is not allowed as it is reserved for Spark bookkeeping operations.") + } + + driverCustomLabels ++ presetLabels + } + + override def environment: Map[String, String] = { + KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_ENV_PREFIX) + } + + override def annotations: Map[String, String] = { + KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_ANNOTATION_PREFIX) + } + + override def secretNamesToMountPaths: Map[String, String] = { + KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_SECRETS_PREFIX) + } + + override def secretEnvNamesToKeyRefs: Map[String, String] = { + KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX) + } + + override def volumes: Seq[KubernetesVolumeSpec] = { + KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, KUBERNETES_DRIVER_VOLUMES_PREFIX) + } +} + +private[spark] class KubernetesExecutorConf( + sparkConf: SparkConf, + val appId: String, + val executorId: String, + val driverPod: Option[Pod]) + extends KubernetesConf(sparkConf) { + + override val resourceNamePrefix: String = { + get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX).getOrElse( + KubernetesConf.getResourceNamePrefix(appName)) + } + + override def labels: Map[String, String] = { + val presetLabels = Map( + SPARK_EXECUTOR_ID_LABEL -> executorId, + SPARK_APP_ID_LABEL -> appId, + SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) + + val executorCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs( + sparkConf, KUBERNETES_EXECUTOR_LABEL_PREFIX) + + presetLabels.keys.foreach { key => + require( + !executorCustomLabels.contains(key), + s"Custom executor labels cannot contain $key as it is reserved for Spark.") + } + + executorCustomLabels ++ presetLabels + } + + override def environment: Map[String, String] = sparkConf.getExecutorEnv.toMap + + override def annotations: Map[String, String] = { + KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_EXECUTOR_ANNOTATION_PREFIX) + } + + override def secretNamesToMountPaths: Map[String, String] = { + KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_EXECUTOR_SECRETS_PREFIX) + } + + override def secretEnvNamesToKeyRefs: Map[String, String] = { + KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX) + } + + override def volumes: Seq[KubernetesVolumeSpec] = { + KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX) + } + +} + private[spark] object KubernetesConf { def createDriverConf( sparkConf: SparkConf, - appName: String, - appResourceNamePrefix: String, appId: String, mainAppResource: MainAppResource, mainClass: String, appArgs: Array[String], - maybePyFiles: Option[String], - hadoopConfDir: Option[String]): KubernetesConf[KubernetesDriverSpecificConf] = { - val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs( - sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX) - require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with key " + - s"$SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping " + - "operations.") - require(!driverCustomLabels.contains(SPARK_ROLE_LABEL), "Label with key " + - s"$SPARK_ROLE_LABEL is not allowed as it is reserved for Spark bookkeeping " + - "operations.") - val driverLabels = driverCustomLabels ++ Map( - SPARK_APP_ID_LABEL -> appId, - SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) - val driverAnnotations = KubernetesUtils.parsePrefixedKeyValuePairs( - sparkConf, KUBERNETES_DRIVER_ANNOTATION_PREFIX) - val driverSecretNamesToMountPaths = KubernetesUtils.parsePrefixedKeyValuePairs( - sparkConf, KUBERNETES_DRIVER_SECRETS_PREFIX) - val driverSecretEnvNamesToKeyRefs = KubernetesUtils.parsePrefixedKeyValuePairs( - sparkConf, KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX) - val driverEnvs = KubernetesUtils.parsePrefixedKeyValuePairs( - sparkConf, KUBERNETES_DRIVER_ENV_PREFIX) - val driverVolumes = KubernetesVolumeUtils.parseVolumesWithPrefix( - sparkConf, KUBERNETES_DRIVER_VOLUMES_PREFIX).map(_.get) - // Also parse executor volumes in order to verify configuration - // before the driver pod is created - KubernetesVolumeUtils.parseVolumesWithPrefix( - sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX).map(_.get) - - val hadoopConfigMapName = sparkConf.get(KUBERNETES_HADOOP_CONF_CONFIG_MAP) - KubernetesUtils.requireNandDefined( - hadoopConfDir, - hadoopConfigMapName, - "Do not specify both the `HADOOP_CONF_DIR` in your ENV and the ConfigMap " + - "as the creation of an additional ConfigMap, when one is already specified is extraneous" ) - val hadoopConfSpec = - if (hadoopConfDir.isDefined || hadoopConfigMapName.isDefined) { - Some(HadoopConfSpec(hadoopConfDir, hadoopConfigMapName)) - } else { - None - } - val pyFiles = maybePyFiles.map(Utils.stringToSeq).getOrElse(Nil) + maybePyFiles: Option[String]): KubernetesDriverConf = { + // Parse executor volumes in order to verify configuration before the driver pod is created. + KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX) - - KubernetesConf( - sparkConf.clone(), - KubernetesDriverSpecificConf(mainAppResource, mainClass, appName, appArgs, pyFiles), - appResourceNamePrefix, - appId, - driverLabels, - driverAnnotations, - driverSecretNamesToMountPaths, - driverSecretEnvNamesToKeyRefs, - driverEnvs, - driverVolumes, - hadoopConfSpec) + val pyFiles = maybePyFiles.map(Utils.stringToSeq).getOrElse(Nil) + new KubernetesDriverConf(sparkConf.clone(), appId, mainAppResource, mainClass, appArgs, + pyFiles) } def createExecutorConf( sparkConf: SparkConf, executorId: String, appId: String, - driverPod: Option[Pod]): KubernetesConf[KubernetesExecutorSpecificConf] = { - val executorCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs( - sparkConf, KUBERNETES_EXECUTOR_LABEL_PREFIX) - require( - !executorCustomLabels.contains(SPARK_APP_ID_LABEL), - s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.") - require( - !executorCustomLabels.contains(SPARK_EXECUTOR_ID_LABEL), - s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" + - " Spark.") - require( - !executorCustomLabels.contains(SPARK_ROLE_LABEL), - s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is reserved for Spark.") - val executorLabels = Map( - SPARK_EXECUTOR_ID_LABEL -> executorId, - SPARK_APP_ID_LABEL -> appId, - SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++ - executorCustomLabels - val executorAnnotations = KubernetesUtils.parsePrefixedKeyValuePairs( - sparkConf, KUBERNETES_EXECUTOR_ANNOTATION_PREFIX) - val executorMountSecrets = KubernetesUtils.parsePrefixedKeyValuePairs( - sparkConf, KUBERNETES_EXECUTOR_SECRETS_PREFIX) - val executorEnvSecrets = KubernetesUtils.parsePrefixedKeyValuePairs( - sparkConf, KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX) - val executorEnv = sparkConf.getExecutorEnv.toMap - val executorVolumes = KubernetesVolumeUtils.parseVolumesWithPrefix( - sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX).map(_.get) - - // If no prefix is defined then we are in pure client mode - // (not the one used by cluster mode inside the container) - val appResourceNamePrefix = { - if (sparkConf.getOption(KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key).isEmpty) { - getResourceNamePrefix(getAppName(sparkConf)) - } else { - sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) - } - } + driverPod: Option[Pod]): KubernetesExecutorConf = { + new KubernetesExecutorConf(sparkConf.clone(), appId, executorId, driverPod) + } - KubernetesConf( - sparkConf.clone(), - KubernetesExecutorSpecificConf(executorId, driverPod), - appResourceNamePrefix, - appId, - executorLabels, - executorAnnotations, - executorMountSecrets, - executorEnvSecrets, - executorEnv, - executorVolumes, - None) + def getResourceNamePrefix(appName: String): String = { --- End diff -- Looks like an identical method appears in [KubernetesClientApplication](https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala#L263-L272) - can we reuse or move this to `KubernetesUtils`?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org