Github user rvesse commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22959#discussion_r231838596
  
    --- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
 ---
    @@ -112,125 +72,139 @@ private[spark] case class KubernetesConf[T <: 
KubernetesRoleSpecificConf](
       def getOption(key: String): Option[String] = sparkConf.getOption(key)
     }
     
    +private[spark] class KubernetesDriverConf(
    +    sparkConf: SparkConf,
    +    val appId: String,
    +    val mainAppResource: MainAppResource,
    +    val mainClass: String,
    +    val appArgs: Array[String],
    +    val pyFiles: Seq[String])
    +  extends KubernetesConf(sparkConf) {
    +
    +  override val resourceNamePrefix: String = {
    +    val custom = if (Utils.isTesting) 
get(KUBERNETES_DRIVER_POD_NAME_PREFIX) else None
    +    custom.getOrElse(KubernetesConf.getResourceNamePrefix(appName))
    +  }
    +
    +  override def labels: Map[String, String] = {
    +    val presetLabels = Map(
    +      SPARK_APP_ID_LABEL -> appId,
    +      SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE)
    +    val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
    +      sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX)
    +
    +    presetLabels.keys.foreach { key =>
    +      require(
    +        !driverCustomLabels.contains(key),
    +        s"Label with key $key is not allowed as it is reserved for Spark 
bookkeeping operations.")
    +    }
    +
    +    driverCustomLabels ++ presetLabels
    +  }
    +
    +  override def environment: Map[String, String] = {
    +    KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, 
KUBERNETES_DRIVER_ENV_PREFIX)
    +  }
    +
    +  override def annotations: Map[String, String] = {
    +    KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, 
KUBERNETES_DRIVER_ANNOTATION_PREFIX)
    +  }
    +
    +  override def secretNamesToMountPaths: Map[String, String] = {
    +    KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, 
KUBERNETES_DRIVER_SECRETS_PREFIX)
    +  }
    +
    +  override def secretEnvNamesToKeyRefs: Map[String, String] = {
    +    KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, 
KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX)
    +  }
    +
    +  override def volumes: Seq[KubernetesVolumeSpec] = {
    +    KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, 
KUBERNETES_DRIVER_VOLUMES_PREFIX)
    +  }
    +}
    +
    +private[spark] class KubernetesExecutorConf(
    +    sparkConf: SparkConf,
    +    val appId: String,
    +    val executorId: String,
    +    val driverPod: Option[Pod])
    +  extends KubernetesConf(sparkConf) {
    +
    +  override val resourceNamePrefix: String = {
    +    get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX).getOrElse(
    +      KubernetesConf.getResourceNamePrefix(appName))
    +  }
    +
    +  override def labels: Map[String, String] = {
    +    val presetLabels = Map(
    +      SPARK_EXECUTOR_ID_LABEL -> executorId,
    +      SPARK_APP_ID_LABEL -> appId,
    +      SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE)
    +
    +    val executorCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
    +      sparkConf, KUBERNETES_EXECUTOR_LABEL_PREFIX)
    +
    +    presetLabels.keys.foreach { key =>
    +      require(
    +        !executorCustomLabels.contains(key),
    +        s"Custom executor labels cannot contain $key as it is reserved for 
Spark.")
    +    }
    +
    +    executorCustomLabels ++ presetLabels
    +  }
    +
    +  override def environment: Map[String, String] = 
sparkConf.getExecutorEnv.toMap
    +
    +  override def annotations: Map[String, String] = {
    +    KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, 
KUBERNETES_EXECUTOR_ANNOTATION_PREFIX)
    +  }
    +
    +  override def secretNamesToMountPaths: Map[String, String] = {
    +    KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, 
KUBERNETES_EXECUTOR_SECRETS_PREFIX)
    +  }
    +
    +  override def secretEnvNamesToKeyRefs: Map[String, String] = {
    +    KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, 
KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX)
    +  }
    +
    +  override def volumes: Seq[KubernetesVolumeSpec] = {
    +    KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, 
KUBERNETES_EXECUTOR_VOLUMES_PREFIX)
    +  }
    +
    +}
    +
     private[spark] object KubernetesConf {
       def createDriverConf(
           sparkConf: SparkConf,
    -      appName: String,
    -      appResourceNamePrefix: String,
           appId: String,
           mainAppResource: MainAppResource,
           mainClass: String,
           appArgs: Array[String],
    -      maybePyFiles: Option[String],
    -      hadoopConfDir: Option[String]): 
KubernetesConf[KubernetesDriverSpecificConf] = {
    -    val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
    -      sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX)
    -    require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with 
key " +
    -      s"$SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark 
bookkeeping " +
    -      "operations.")
    -    require(!driverCustomLabels.contains(SPARK_ROLE_LABEL), "Label with 
key " +
    -      s"$SPARK_ROLE_LABEL is not allowed as it is reserved for Spark 
bookkeeping " +
    -      "operations.")
    -    val driverLabels = driverCustomLabels ++ Map(
    -      SPARK_APP_ID_LABEL -> appId,
    -      SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE)
    -    val driverAnnotations = KubernetesUtils.parsePrefixedKeyValuePairs(
    -      sparkConf, KUBERNETES_DRIVER_ANNOTATION_PREFIX)
    -    val driverSecretNamesToMountPaths = 
KubernetesUtils.parsePrefixedKeyValuePairs(
    -      sparkConf, KUBERNETES_DRIVER_SECRETS_PREFIX)
    -    val driverSecretEnvNamesToKeyRefs = 
KubernetesUtils.parsePrefixedKeyValuePairs(
    -      sparkConf, KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX)
    -    val driverEnvs = KubernetesUtils.parsePrefixedKeyValuePairs(
    -      sparkConf, KUBERNETES_DRIVER_ENV_PREFIX)
    -    val driverVolumes = KubernetesVolumeUtils.parseVolumesWithPrefix(
    -      sparkConf, KUBERNETES_DRIVER_VOLUMES_PREFIX).map(_.get)
    -    // Also parse executor volumes in order to verify configuration
    -    // before the driver pod is created
    -    KubernetesVolumeUtils.parseVolumesWithPrefix(
    -      sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX).map(_.get)
    -
    -    val hadoopConfigMapName = 
sparkConf.get(KUBERNETES_HADOOP_CONF_CONFIG_MAP)
    -    KubernetesUtils.requireNandDefined(
    -      hadoopConfDir,
    -      hadoopConfigMapName,
    -      "Do not specify both the `HADOOP_CONF_DIR` in your ENV and the 
ConfigMap " +
    -      "as the creation of an additional ConfigMap, when one is already 
specified is extraneous" )
    -    val hadoopConfSpec =
    -      if (hadoopConfDir.isDefined || hadoopConfigMapName.isDefined) {
    -        Some(HadoopConfSpec(hadoopConfDir, hadoopConfigMapName))
    -      } else {
    -        None
    -      }
    -    val pyFiles = maybePyFiles.map(Utils.stringToSeq).getOrElse(Nil)
    +      maybePyFiles: Option[String]): KubernetesDriverConf = {
    +    // Parse executor volumes in order to verify configuration before the 
driver pod is created.
    +    KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, 
KUBERNETES_EXECUTOR_VOLUMES_PREFIX)
     
    -
    -    KubernetesConf(
    -      sparkConf.clone(),
    -      KubernetesDriverSpecificConf(mainAppResource, mainClass, appName, 
appArgs, pyFiles),
    -      appResourceNamePrefix,
    -      appId,
    -      driverLabels,
    -      driverAnnotations,
    -      driverSecretNamesToMountPaths,
    -      driverSecretEnvNamesToKeyRefs,
    -      driverEnvs,
    -      driverVolumes,
    -      hadoopConfSpec)
    +    val pyFiles = maybePyFiles.map(Utils.stringToSeq).getOrElse(Nil)
    +    new KubernetesDriverConf(sparkConf.clone(), appId, mainAppResource, 
mainClass, appArgs,
    +      pyFiles)
       }
     
       def createExecutorConf(
           sparkConf: SparkConf,
           executorId: String,
           appId: String,
    -      driverPod: Option[Pod]): 
KubernetesConf[KubernetesExecutorSpecificConf] = {
    -    val executorCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
    -      sparkConf, KUBERNETES_EXECUTOR_LABEL_PREFIX)
    -    require(
    -      !executorCustomLabels.contains(SPARK_APP_ID_LABEL),
    -      s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is 
reserved for Spark.")
    -    require(
    -      !executorCustomLabels.contains(SPARK_EXECUTOR_ID_LABEL),
    -      s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as 
it is reserved for" +
    -        " Spark.")
    -    require(
    -      !executorCustomLabels.contains(SPARK_ROLE_LABEL),
    -      s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is 
reserved for Spark.")
    -    val executorLabels = Map(
    -      SPARK_EXECUTOR_ID_LABEL -> executorId,
    -      SPARK_APP_ID_LABEL -> appId,
    -      SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++
    -      executorCustomLabels
    -    val executorAnnotations = KubernetesUtils.parsePrefixedKeyValuePairs(
    -      sparkConf, KUBERNETES_EXECUTOR_ANNOTATION_PREFIX)
    -    val executorMountSecrets = KubernetesUtils.parsePrefixedKeyValuePairs(
    -      sparkConf, KUBERNETES_EXECUTOR_SECRETS_PREFIX)
    -    val executorEnvSecrets = KubernetesUtils.parsePrefixedKeyValuePairs(
    -      sparkConf, KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX)
    -    val executorEnv = sparkConf.getExecutorEnv.toMap
    -    val executorVolumes = KubernetesVolumeUtils.parseVolumesWithPrefix(
    -      sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX).map(_.get)
    -
    -    // If no prefix is defined then we are in pure client mode
    -    // (not the one used by cluster mode inside the container)
    -    val appResourceNamePrefix = {
    -      if 
(sparkConf.getOption(KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key).isEmpty) {
    -        getResourceNamePrefix(getAppName(sparkConf))
    -      } else {
    -        sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
    -      }
    -    }
    +      driverPod: Option[Pod]): KubernetesExecutorConf = {
    +    new KubernetesExecutorConf(sparkConf.clone(), appId, executorId, 
driverPod)
    +  }
     
    -    KubernetesConf(
    -      sparkConf.clone(),
    -      KubernetesExecutorSpecificConf(executorId, driverPod),
    -      appResourceNamePrefix,
    -      appId,
    -      executorLabels,
    -      executorAnnotations,
    -      executorMountSecrets,
    -      executorEnvSecrets,
    -      executorEnv,
    -      executorVolumes,
    -      None)
    +  def getResourceNamePrefix(appName: String): String = {
    --- End diff --
    
    Looks like an identical method appears in 
[KubernetesClientApplication](https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala#L263-L272)
 - can we reuse or move this to `KubernetesUtils`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to