Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22911#discussion_r233653814
  
    --- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
 ---
    @@ -69,97 +72,191 @@ private[spark] class KerberosConfDriverFeatureStep(
         "If a Kerberos keytab is specified you must also specify a Kerberos 
principal")
     
       KubernetesUtils.requireBothOrNeitherDefined(
    -    existingSecretName,
    -    existingSecretItemKey,
    +    existingDtSecret,
    +    existingDtItemKey,
         "If a secret data item-key where the data of the Kerberos Delegation 
Token is specified" +
           " you must also specify the name of the secret",
         "If a secret storing a Kerberos Delegation Token is specified you must 
also" +
           " specify the item-key where the data is stored")
     
    -  private val hadoopConfigurationFiles = 
hadoopConfDirSpec.hadoopConfDir.map { hConfDir =>
    -    HadoopBootstrapUtil.getHadoopConfFiles(hConfDir)
    +  if (!hasKerberosConf) {
    +    logInfo("You have not specified a krb5.conf file locally or via a 
ConfigMap. " +
    +      "Make sure that you have the krb5.conf locally on the driver image.")
       }
    -  private val newHadoopConfigMapName =
    -    if (hadoopConfDirSpec.hadoopConfigMapName.isEmpty) {
    -      Some(kubernetesConf.hadoopConfigMapName)
    -    } else {
    -      None
    -    }
     
    -  // Either use pre-existing secret or login to create new Secret with DT 
stored within
    -  private val kerberosConfSpec: Option[KerberosConfigSpec] = (for {
    -    secretName <- existingSecretName
    -    secretItemKey <- existingSecretItemKey
    -  } yield {
    -    KerberosConfigSpec(
    -      dtSecret = None,
    -      dtSecretName = secretName,
    -      dtSecretItemKey = secretItemKey,
    -      jobUserName = kubeTokenManager.getCurrentUser.getShortUserName)
    -  }).orElse(
    -    if (isKerberosEnabled) {
    -      Some(HadoopKerberosLogin.buildSpec(
    -        conf,
    -        kubernetesConf.appResourceNamePrefix,
    -        kubeTokenManager))
    -    } else {
    -      None
    +  // Create delegation tokens if needed. This is a lazy val so that it's 
not populated
    +  // unnecessarily. But it needs to be accessible to different methods in 
this class,
    +  // since it's not clear based solely on available configuration options 
that delegation
    +  // tokens are needed when other credentials are not available.
    +  private lazy val delegationTokens: Array[Byte] = if (keytab.isEmpty && 
existingDtSecret.isEmpty) {
    +    createDelegationTokens()
    +  } else {
    +    null
    +  }
    +
    +  private def needKeytabUpload: Boolean = 
keytab.exists(!Utils.isLocalUri(_))
    +
    +  private def dtSecretName: String = 
s"${kubernetesConf.appResourceNamePrefix}-delegation-tokens"
    +
    +  private def ktSecretName: String = 
s"${kubernetesConf.appResourceNamePrefix}-kerberos-keytab"
    +
    +  private def hasKerberosConf: Boolean = krb5CMap.isDefined | 
krb5File.isDefined
    +
    +  override def configurePod(original: SparkPod): SparkPod = {
    +    original.transform { case pod if hasKerberosConf =>
    +      val configMapVolume = if (krb5CMap.isDefined) {
    +        new VolumeBuilder()
    +          .withName(KRB_FILE_VOLUME)
    +          .withNewConfigMap()
    +            .withName(krb5CMap.get)
    +            .endConfigMap()
    +          .build()
    +      } else {
    +        val krb5Conf = new File(krb5File.get)
    +        new VolumeBuilder()
    +          .withName(KRB_FILE_VOLUME)
    +          .withNewConfigMap()
    +          .withName(kubernetesConf.krbConfigMapName)
    +          .withItems(new KeyToPathBuilder()
    +            .withKey(krb5Conf.getName())
    +            .withPath(krb5Conf.getName())
    +            .build())
    +          .endConfigMap()
    +          .build()
    +      }
    +
    +      val podWithVolume = new PodBuilder(pod.pod)
    +        .editSpec()
    +          .addNewVolumeLike(configMapVolume)
    +            .endVolume()
    +          .endSpec()
    +        .build()
    +
    +      val containerWithMount = new ContainerBuilder(pod.container)
    +        .addNewVolumeMount()
    +          .withName(KRB_FILE_VOLUME)
    +          .withMountPath(KRB_FILE_DIR_PATH + "/krb5.conf")
    +          .withSubPath("krb5.conf")
    +          .endVolumeMount()
    +        .build()
    +
    +      SparkPod(podWithVolume, containerWithMount)
    +    }.transform {
    +      case pod if needKeytabUpload =>
    +        // If keytab is defined and is a submission-local file (not local: 
URI), then create a
    +        // secret for it. The keytab data will be stored in this secret 
below.
    +        val podWitKeytab = new PodBuilder(pod.pod)
    +          .editOrNewSpec()
    +            .addNewVolume()
    +              .withName(KERBEROS_KEYTAB_VOLUME)
    +              .withNewSecret()
    +                .withSecretName(ktSecretName)
    +                .endSecret()
    +              .endVolume()
    +            .endSpec()
    +          .build()
    +
    +        val containerWithKeytab = new ContainerBuilder(pod.container)
    +          .addNewVolumeMount()
    +            .withName(KERBEROS_KEYTAB_VOLUME)
    +            .withMountPath(KERBEROS_KEYTAB_MOUNT_POINT)
    +            .endVolumeMount()
    +          .build()
    +
    +        SparkPod(podWitKeytab, containerWithKeytab)
    +
    +      case pod if existingDtSecret.isDefined | delegationTokens != null =>
    +        val secretName = existingDtSecret.getOrElse(dtSecretName)
    +        val itemKey = existingDtItemKey.getOrElse(KERBEROS_SECRET_KEY)
    +
    +        val podWithTokens = new PodBuilder(pod.pod)
    +          .editOrNewSpec()
    +            .addNewVolume()
    +              .withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
    +              .withNewSecret()
    +                .withSecretName(secretName)
    +                .endSecret()
    +              .endVolume()
    +            .endSpec()
    +          .build()
    +
    +        val containerWithTokens = new ContainerBuilder(pod.container)
    +          .addNewVolumeMount()
    +            .withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
    +            .withMountPath(SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR)
    +            .endVolumeMount()
    +          .addNewEnv()
    +            .withName(ENV_HADOOP_TOKEN_FILE_LOCATION)
    +            .withValue(s"$SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR/$itemKey")
    +            .endEnv()
    +          .build()
    +
    +        SparkPod(podWithTokens, containerWithTokens)
         }
    -  )
    -
    -  override def configurePod(pod: SparkPod): SparkPod = {
    -    val hadoopBasedSparkPod = HadoopBootstrapUtil.bootstrapHadoopConfDir(
    -      hadoopConfDirSpec.hadoopConfDir,
    -      newHadoopConfigMapName,
    -      hadoopConfDirSpec.hadoopConfigMapName,
    -      pod)
    -    kerberosConfSpec.map { hSpec =>
    -      HadoopBootstrapUtil.bootstrapKerberosPod(
    -        hSpec.dtSecretName,
    -        hSpec.dtSecretItemKey,
    -        hSpec.jobUserName,
    -        krb5File,
    -        Some(kubernetesConf.krbConfigMapName),
    -        krb5CMap,
    -        hadoopBasedSparkPod)
    -    }.getOrElse(
    -      HadoopBootstrapUtil.bootstrapSparkUserPod(
    -        kubeTokenManager.getCurrentUser.getShortUserName,
    -        hadoopBasedSparkPod))
       }
     
       override def getAdditionalPodSystemProperties(): Map[String, String] = {
    -    val resolvedConfValues = kerberosConfSpec.map { hSpec =>
    -      Map(KERBEROS_DT_SECRET_NAME -> hSpec.dtSecretName,
    -        KERBEROS_DT_SECRET_KEY -> hSpec.dtSecretItemKey,
    -        KERBEROS_SPARK_USER_NAME -> hSpec.jobUserName,
    -        KRB5_CONFIG_MAP_NAME -> 
krb5CMap.getOrElse(kubernetesConf.krbConfigMapName))
    -      }.getOrElse(
    -        Map(KERBEROS_SPARK_USER_NAME ->
    -          kubeTokenManager.getCurrentUser.getShortUserName))
    -    Map(HADOOP_CONFIG_MAP_NAME ->
    -      hadoopConfDirSpec.hadoopConfigMapName.getOrElse(
    -      kubernetesConf.hadoopConfigMapName)) ++ resolvedConfValues
    +    // If a submission-local keytab is provided, update the Spark config 
so that it knows the
    +    // path of the keytab in the driver container.
    +    if (needKeytabUpload) {
    +      val ktName = new File(keytab.get).getName()
    +      Map(KEYTAB.key -> s"$KERBEROS_KEYTAB_MOUNT_POINT/$ktName")
    +    } else {
    +      Map.empty
    +    }
       }
     
       override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
    -    val hadoopConfConfigMap = for {
    -      hName <- newHadoopConfigMapName
    -      hFiles <- hadoopConfigurationFiles
    -    } yield {
    -      HadoopBootstrapUtil.buildHadoopConfigMap(hName, hFiles)
    +    Seq[HasMetadata]() ++ {
    --- End diff --
    
    > seems like a lot of logic going into this step
    
    The logic itself isn't really different from before. The only difference is 
that the "what is done" for each combination of configs now also lives here.
    
    It could be broken down into separate steps, but then you'd have 3 related 
steps that need to agree on how to treat the configuration of the driver; e.g. 
if the step that adds they keytab runs, then the step that creates delegation 
tokens shouldn't run. And that makes the code harder to follow, because you 
have to look at multiple places to understand how this works.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to