This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 76e0b6b [SPARK-27002][SS] Get kafka delegation tokens right before consumer/producer created 76e0b6b is described below commit 76e0b6bafb66a5cd02edcf924a90fc389fddea8e Author: Gabor Somogyi <gabor.g.somo...@gmail.com> AuthorDate: Wed Feb 27 10:07:02 2019 -0800 [SPARK-27002][SS] Get kafka delegation tokens right before consumer/producer created ## What changes were proposed in this pull request? Spark not always picking up the latest Kafka delegation tokens even if a new one properly obtained. In the PR I'm setting delegation tokens right before `KafkaConsumer` and `KafkaProducer` creation to be on the safe side. ## How was this patch tested? Long running Kafka to Kafka tests on 4 node cluster with randomly thrown artificial exceptions. Test scenario: * 4 node cluster * Yarn * Kafka broker version 2.1.0 * security.protocol = SASL_SSL * sasl.mechanism = SCRAM-SHA-512 Kafka broker settings: * delegation.token.expiry.time.ms=600000 (10 min) * delegation.token.max.lifetime.ms=1200000 (20 min) * delegation.token.expiry.check.interval.ms=300000 (5 min) After each 7.5 minutes new delegation token obtained from Kafka broker (10 min * 0.75). But when token expired after 10 minutes (Spark obtains new one and doesn't renew the old), the brokers expiring thread comes after each 5 minutes (invalidates expired tokens) and artificial exception has been thrown inside the Spark application (such case Spark closes connection), then the latest delegation token not always picked up. Closes #23906 from gaborgsomogyi/SPARK-27002. Authored-by: Gabor Somogyi <gabor.g.somo...@gmail.com> Signed-off-by: Marcelo Vanzin <van...@cloudera.com> --- .../spark/sql/kafka010/CachedKafkaProducer.scala | 8 ++++++-- .../apache/spark/sql/kafka010/ConsumerStrategy.scala | 18 +++++++++++++++--- .../apache/spark/sql/kafka010/KafkaConfigUpdater.scala | 2 +- .../apache/spark/sql/kafka010/KafkaDataConsumer.scala | 5 ++++- .../spark/sql/kafka010/KafkaSourceProvider.scala | 3 --- 5 files changed, 26 insertions(+), 10 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala index cd680ad..f24001f 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala @@ -64,8 +64,12 @@ private[kafka010] object CachedKafkaProducer extends Logging { .build[Seq[(String, Object)], Producer](cacheLoader) private def createKafkaProducer(producerConfiguration: ju.Map[String, Object]): Producer = { - val kafkaProducer: Producer = new Producer(producerConfiguration) - logDebug(s"Created a new instance of KafkaProducer for $producerConfiguration.") + val updatedKafkaProducerConfiguration = + KafkaConfigUpdater("executor", producerConfiguration.asScala.toMap) + .setAuthenticationConfigIfNeeded() + .build() + val kafkaProducer: Producer = new Producer(updatedKafkaProducerConfiguration) + logDebug(s"Created a new instance of KafkaProducer for $updatedKafkaProducerConfiguration.") kafkaProducer } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala index 66511b3..dfdafce 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala @@ -37,6 +37,15 @@ import org.apache.kafka.common.TopicPartition sealed trait ConsumerStrategy { /** Create a [[KafkaConsumer]] and subscribe to topics according to a desired strategy */ def createConsumer(kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]] + + /** + * Updates the parameters with security if needed. + * Added a function to hide internals and reduce code duplications because all strategy uses it. + */ + protected def setAuthenticationConfigIfNeeded(kafkaParams: ju.Map[String, Object]) = + KafkaConfigUpdater("source", kafkaParams.asScala.toMap) + .setAuthenticationConfigIfNeeded() + .build() } /** @@ -45,7 +54,8 @@ sealed trait ConsumerStrategy { case class AssignStrategy(partitions: Array[TopicPartition]) extends ConsumerStrategy { override def createConsumer( kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]] = { - val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams) + val updatedKafkaParams = setAuthenticationConfigIfNeeded(kafkaParams) + val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](updatedKafkaParams) consumer.assign(ju.Arrays.asList(partitions: _*)) consumer } @@ -59,7 +69,8 @@ case class AssignStrategy(partitions: Array[TopicPartition]) extends ConsumerStr case class SubscribeStrategy(topics: Seq[String]) extends ConsumerStrategy { override def createConsumer( kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]] = { - val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams) + val updatedKafkaParams = setAuthenticationConfigIfNeeded(kafkaParams) + val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](updatedKafkaParams) consumer.subscribe(topics.asJava) consumer } @@ -73,7 +84,8 @@ case class SubscribeStrategy(topics: Seq[String]) extends ConsumerStrategy { case class SubscribePatternStrategy(topicPattern: String) extends ConsumerStrategy { override def createConsumer( kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]] = { - val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams) + val updatedKafkaParams = setAuthenticationConfigIfNeeded(kafkaParams) + val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](updatedKafkaParams) consumer.subscribe( ju.regex.Pattern.compile(topicPattern), new NoOpConsumerRebalanceListener()) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdater.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdater.scala index 38bf5d7..978dfe6 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdater.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdater.scala @@ -31,7 +31,7 @@ import org.apache.spark.kafka010.KafkaTokenUtil /** * Class to conveniently update Kafka config params, while logging the changes */ -private[kafka010] case class KafkaConfigUpdater(module: String, kafkaParams: Map[String, String]) +private[kafka010] case class KafkaConfigUpdater(module: String, kafkaParams: Map[String, Object]) extends Logging { private val map = new ju.HashMap[String, Object](kafkaParams.asJava) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala index 7b1314b..a0255a1 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala @@ -197,7 +197,10 @@ private[kafka010] case class InternalKafkaConsumer( /** Create a KafkaConsumer to fetch records for `topicPartition` */ private def createConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = { - val c = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams) + val updatedKafkaParams = KafkaConfigUpdater("executor", kafkaParams.asScala.toMap) + .setAuthenticationConfigIfNeeded() + .build() + val c = new KafkaConsumer[Array[Byte], Array[Byte]](updatedKafkaParams) val tps = new ju.ArrayList[TopicPartition]() tps.add(topicPartition) c.assign(tps) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 6994517..a139573 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -525,7 +525,6 @@ private[kafka010] object KafkaSourceProvider extends Logging { // If buffer config is not set, set it to reasonable value to work around // buffer issues (see KAFKA-3135) .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer) - .setAuthenticationConfigIfNeeded() .build() def kafkaParamsForExecutors( @@ -547,7 +546,6 @@ private[kafka010] object KafkaSourceProvider extends Logging { // If buffer config is not set, set it to reasonable value to work around // buffer issues (see KAFKA-3135) .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer) - .setAuthenticationConfigIfNeeded() .build() /** @@ -582,7 +580,6 @@ private[kafka010] object KafkaSourceProvider extends Logging { KafkaConfigUpdater("executor", specifiedKafkaParams) .set(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, serClassName) .set(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serClassName) - .setAuthenticationConfigIfNeeded() .build() } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org