Github user gaborgsomogyi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22598#discussion_r222083143
  
    --- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
    @@ -560,25 +553,56 @@ private[kafka010] object KafkaSourceProvider extends 
Logging {
       }
     
       private[kafka010] def kafkaParamsForProducer(
    -      parameters: Map[String, String]): Map[String, String] = {
    +      parameters: Map[String, String]): ju.Map[String, Object] = {
         val caseInsensitiveParams = parameters.map { case (k, v) => 
(k.toLowerCase(Locale.ROOT), v) }
         if 
(caseInsensitiveParams.contains(s"kafka.${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}"))
 {
           throw new IllegalArgumentException(
             s"Kafka option '${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}' is 
not supported as keys "
               + "are serialized with ByteArraySerializer.")
         }
     
    -    if 
(caseInsensitiveParams.contains(s"kafka.${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}"))
    -    {
    +    if 
(caseInsensitiveParams.contains(s"kafka.${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}"))
 {
           throw new IllegalArgumentException(
             s"Kafka option '${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}' 
is not supported as "
               + "value are serialized with ByteArraySerializer.")
         }
    +
    +    val specifiedKafkaParams = convertToSpecifiedParams(parameters)
    +
    +    val configUpdater = ConfigUpdater("executor", specifiedKafkaParams)
    +      .set(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, serClassName)
    +      .set(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serClassName)
    +
    +    setTokenJaasConfig(specifiedKafkaParams, configUpdater)
    +
    +    configUpdater.build()
    +  }
    +
    +  private def convertToSpecifiedParams(parameters: Map[String, String]): 
Map[String, String] = {
         parameters
           .keySet
           .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
           .map { k => k.drop(6).toString -> parameters(k) }
    -      .toMap + (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> 
classOf[ByteArraySerializer].getName,
    -      ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> 
classOf[ByteArraySerializer].getName)
    +      .toMap
    +  }
    +
    +  private def setTokenJaasConfig(
    --- End diff --
    
    Moved.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to