Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r222083143 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala --- @@ -560,25 +553,56 @@ private[kafka010] object KafkaSourceProvider extends Logging { } private[kafka010] def kafkaParamsForProducer( - parameters: Map[String, String]): Map[String, String] = { + parameters: Map[String, String]): ju.Map[String, Object] = { val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) } if (caseInsensitiveParams.contains(s"kafka.${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}")) { throw new IllegalArgumentException( s"Kafka option '${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}' is not supported as keys " + "are serialized with ByteArraySerializer.") } - if (caseInsensitiveParams.contains(s"kafka.${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}")) - { + if (caseInsensitiveParams.contains(s"kafka.${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}")) { throw new IllegalArgumentException( s"Kafka option '${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}' is not supported as " + "value are serialized with ByteArraySerializer.") } + + val specifiedKafkaParams = convertToSpecifiedParams(parameters) + + val configUpdater = ConfigUpdater("executor", specifiedKafkaParams) + .set(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, serClassName) + .set(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serClassName) + + setTokenJaasConfig(specifiedKafkaParams, configUpdater) + + configUpdater.build() + } + + private def convertToSpecifiedParams(parameters: Map[String, String]): Map[String, String] = { parameters .keySet .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka.")) .map { k => k.drop(6).toString -> parameters(k) } - .toMap + (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[ByteArraySerializer].getName, - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[ByteArraySerializer].getName) + .toMap + } + + private def setTokenJaasConfig( --- End diff -- Moved.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org