Github user gaborgsomogyi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22598#discussion_r223354097
  
    --- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
    @@ -556,29 +549,61 @@ private[kafka010] object KafkaSourceProvider extends 
Logging {
           this
         }
     
    +    def setTokenJaasConfigIfNeeded(): ConfigUpdater = {
    +      // There are multiple possibilities to log in:
    +      // - Token is provided -> try to log in with scram module using 
kafka's dynamic JAAS
    +      //   configuration.
    +      // - Token not provided -> try to log in with JVM global security 
configuration
    +      //   which can be configured for example with 
'java.security.auth.login.config'.
    +      //   For this no additional parameter needed.
    +      KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf) match {
    +        case Some(jaasParams) =>
    +          logInfo("Delegation token detected, using it for login.")
    +          val mechanism = kafkaParams
    +            .getOrElse(SaslConfigs.SASL_MECHANISM, 
SaslConfigs.DEFAULT_SASL_MECHANISM)
    +          require(mechanism.startsWith("SCRAM"),
    +            "Delegation token works only with SCRAM mechanism.")
    +          set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
    +        case None => // No params required
    +          logInfo("Delegation token not found.")
    --- End diff --
    
    Fixed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to