Github user ifilonenko commented on a diff in the pull request: https://github.com/apache/spark/pull/22911#discussion_r233648073 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala --- @@ -69,97 +72,191 @@ private[spark] class KerberosConfDriverFeatureStep( "If a Kerberos keytab is specified you must also specify a Kerberos principal") KubernetesUtils.requireBothOrNeitherDefined( - existingSecretName, - existingSecretItemKey, + existingDtSecret, + existingDtItemKey, "If a secret data item-key where the data of the Kerberos Delegation Token is specified" + " you must also specify the name of the secret", "If a secret storing a Kerberos Delegation Token is specified you must also" + " specify the item-key where the data is stored") - private val hadoopConfigurationFiles = hadoopConfDirSpec.hadoopConfDir.map { hConfDir => - HadoopBootstrapUtil.getHadoopConfFiles(hConfDir) + if (!hasKerberosConf) { + logInfo("You have not specified a krb5.conf file locally or via a ConfigMap. " + + "Make sure that you have the krb5.conf locally on the driver image.") } - private val newHadoopConfigMapName = - if (hadoopConfDirSpec.hadoopConfigMapName.isEmpty) { - Some(kubernetesConf.hadoopConfigMapName) - } else { - None - } - // Either use pre-existing secret or login to create new Secret with DT stored within - private val kerberosConfSpec: Option[KerberosConfigSpec] = (for { - secretName <- existingSecretName - secretItemKey <- existingSecretItemKey - } yield { - KerberosConfigSpec( - dtSecret = None, - dtSecretName = secretName, - dtSecretItemKey = secretItemKey, - jobUserName = kubeTokenManager.getCurrentUser.getShortUserName) - }).orElse( - if (isKerberosEnabled) { - Some(HadoopKerberosLogin.buildSpec( - conf, - kubernetesConf.appResourceNamePrefix, - kubeTokenManager)) - } else { - None + // Create delegation tokens if needed. This is a lazy val so that it's not populated + // unnecessarily. But it needs to be accessible to different methods in this class, + // since it's not clear based solely on available configuration options that delegation + // tokens are needed when other credentials are not available. + private lazy val delegationTokens: Array[Byte] = if (keytab.isEmpty && existingDtSecret.isEmpty) { + createDelegationTokens() + } else { + null + } + + private def needKeytabUpload: Boolean = keytab.exists(!Utils.isLocalUri(_)) + + private def dtSecretName: String = s"${kubernetesConf.appResourceNamePrefix}-delegation-tokens" + + private def ktSecretName: String = s"${kubernetesConf.appResourceNamePrefix}-kerberos-keytab" + + private def hasKerberosConf: Boolean = krb5CMap.isDefined | krb5File.isDefined + + override def configurePod(original: SparkPod): SparkPod = { + original.transform { case pod if hasKerberosConf => + val configMapVolume = if (krb5CMap.isDefined) { + new VolumeBuilder() + .withName(KRB_FILE_VOLUME) + .withNewConfigMap() + .withName(krb5CMap.get) + .endConfigMap() + .build() + } else { + val krb5Conf = new File(krb5File.get) + new VolumeBuilder() + .withName(KRB_FILE_VOLUME) + .withNewConfigMap() + .withName(kubernetesConf.krbConfigMapName) + .withItems(new KeyToPathBuilder() + .withKey(krb5Conf.getName()) + .withPath(krb5Conf.getName()) + .build()) + .endConfigMap() + .build() + } + + val podWithVolume = new PodBuilder(pod.pod) + .editSpec() + .addNewVolumeLike(configMapVolume) + .endVolume() + .endSpec() + .build() + + val containerWithMount = new ContainerBuilder(pod.container) + .addNewVolumeMount() + .withName(KRB_FILE_VOLUME) + .withMountPath(KRB_FILE_DIR_PATH + "/krb5.conf") + .withSubPath("krb5.conf") + .endVolumeMount() + .build() + + SparkPod(podWithVolume, containerWithMount) + }.transform { + case pod if needKeytabUpload => + // If keytab is defined and is a submission-local file (not local: URI), then create a + // secret for it. The keytab data will be stored in this secret below. + val podWitKeytab = new PodBuilder(pod.pod) + .editOrNewSpec() + .addNewVolume() + .withName(KERBEROS_KEYTAB_VOLUME) + .withNewSecret() + .withSecretName(ktSecretName) + .endSecret() + .endVolume() + .endSpec() + .build() + + val containerWithKeytab = new ContainerBuilder(pod.container) + .addNewVolumeMount() + .withName(KERBEROS_KEYTAB_VOLUME) + .withMountPath(KERBEROS_KEYTAB_MOUNT_POINT) + .endVolumeMount() + .build() + + SparkPod(podWitKeytab, containerWithKeytab) + + case pod if existingDtSecret.isDefined | delegationTokens != null => + val secretName = existingDtSecret.getOrElse(dtSecretName) + val itemKey = existingDtItemKey.getOrElse(KERBEROS_SECRET_KEY) + + val podWithTokens = new PodBuilder(pod.pod) + .editOrNewSpec() + .addNewVolume() + .withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME) + .withNewSecret() + .withSecretName(secretName) + .endSecret() + .endVolume() + .endSpec() + .build() + + val containerWithTokens = new ContainerBuilder(pod.container) + .addNewVolumeMount() + .withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME) + .withMountPath(SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR) + .endVolumeMount() + .addNewEnv() + .withName(ENV_HADOOP_TOKEN_FILE_LOCATION) + .withValue(s"$SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR/$itemKey") + .endEnv() + .build() + + SparkPod(podWithTokens, containerWithTokens) } - ) - - override def configurePod(pod: SparkPod): SparkPod = { - val hadoopBasedSparkPod = HadoopBootstrapUtil.bootstrapHadoopConfDir( - hadoopConfDirSpec.hadoopConfDir, - newHadoopConfigMapName, - hadoopConfDirSpec.hadoopConfigMapName, - pod) - kerberosConfSpec.map { hSpec => - HadoopBootstrapUtil.bootstrapKerberosPod( - hSpec.dtSecretName, - hSpec.dtSecretItemKey, - hSpec.jobUserName, - krb5File, - Some(kubernetesConf.krbConfigMapName), - krb5CMap, - hadoopBasedSparkPod) - }.getOrElse( - HadoopBootstrapUtil.bootstrapSparkUserPod( - kubeTokenManager.getCurrentUser.getShortUserName, - hadoopBasedSparkPod)) } override def getAdditionalPodSystemProperties(): Map[String, String] = { - val resolvedConfValues = kerberosConfSpec.map { hSpec => - Map(KERBEROS_DT_SECRET_NAME -> hSpec.dtSecretName, - KERBEROS_DT_SECRET_KEY -> hSpec.dtSecretItemKey, - KERBEROS_SPARK_USER_NAME -> hSpec.jobUserName, - KRB5_CONFIG_MAP_NAME -> krb5CMap.getOrElse(kubernetesConf.krbConfigMapName)) - }.getOrElse( - Map(KERBEROS_SPARK_USER_NAME -> - kubeTokenManager.getCurrentUser.getShortUserName)) - Map(HADOOP_CONFIG_MAP_NAME -> - hadoopConfDirSpec.hadoopConfigMapName.getOrElse( - kubernetesConf.hadoopConfigMapName)) ++ resolvedConfValues + // If a submission-local keytab is provided, update the Spark config so that it knows the + // path of the keytab in the driver container. + if (needKeytabUpload) { + val ktName = new File(keytab.get).getName() + Map(KEYTAB.key -> s"$KERBEROS_KEYTAB_MOUNT_POINT/$ktName") + } else { + Map.empty + } } override def getAdditionalKubernetesResources(): Seq[HasMetadata] = { - val hadoopConfConfigMap = for { - hName <- newHadoopConfigMapName - hFiles <- hadoopConfigurationFiles - } yield { - HadoopBootstrapUtil.buildHadoopConfigMap(hName, hFiles) + Seq[HasMetadata]() ++ { + krb5File.map { path => + val file = new File(path) + new ConfigMapBuilder() + .withNewMetadata() + .withName(kubernetesConf.krbConfigMapName) + .endMetadata() + .addToData( + Map(file.getName() -> Files.toString(file, StandardCharsets.UTF_8)).asJava) + .build() + } + } ++ { + // If a submission-local keytab is provided, stash it in a secret. + if (needKeytabUpload) { + val kt = new File(keytab.get) + Seq(new SecretBuilder() + .withNewMetadata() + .withName(ktSecretName) + .endMetadata() + .addToData(kt.getName(), Base64.encodeBase64String(Files.toByteArray(kt))) + .build()) + } else { + Nil + } + } ++ { + if (delegationTokens != null) { + Seq(new SecretBuilder() + .withNewMetadata() + .withName(dtSecretName) + .endMetadata() + .addToData(KERBEROS_SECRET_KEY, Base64.encodeBase64String(delegationTokens)) + .build()) + } else { + Nil + } } + } - val krb5ConfigMap = krb5File.map { fileLocation => - HadoopBootstrapUtil.buildkrb5ConfigMap( - kubernetesConf.krbConfigMapName, - fileLocation) + // Visible for testing. + def createDelegationTokens(): Array[Byte] = { --- End diff -- nit: Why isn't this logic done in a `KubernetesDelegationTokenManager` or some Util class? It seems we are over-running this step with a lot of functionality? no? You are also just mocking via `when(step).createDelegationToken()`, I personally don't agree with the coverage brought up by this method of unit-testing but this is an extension of the discussion on a separate PR. i.e. I would mock the tokenManager and test to ensure that the `obtainDelegationTokens` take in a particular `creds` as an argument (also, so that one could test the if statement below).`
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org