Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r222786239 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala --- @@ -556,29 +549,61 @@ private[kafka010] object KafkaSourceProvider extends Logging { this } + def setTokenJaasConfigIfNeeded(): ConfigUpdater = { + // There are multiple possibilities to log in: + // - Token is provided -> try to log in with scram module using kafka's dynamic JAAS + // configuration. + // - Token not provided -> try to log in with JVM global security configuration + // which can be configured for example with 'java.security.auth.login.config'. + // For this no additional parameter needed. + KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf) match { + case Some(jaasParams) => + logInfo("Delegation token detected, using it for login.") + val mechanism = kafkaParams + .getOrElse(SaslConfigs.SASL_MECHANISM, SaslConfigs.DEFAULT_SASL_MECHANISM) + require(mechanism.startsWith("SCRAM"), + "Delegation token works only with SCRAM mechanism.") + set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams) + case None => // No params required + logInfo("Delegation token not found.") --- End diff -- I don't think anything show be logged in this case (which is currently the default). In which case this can become a `foreach` instead of a `match`.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org