This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 98a8725 [SPARK-27022][DSTREAMS] Add kafka delegation token support. 98a8725 is described below commit 98a8725e66ee992e8db7035449e73225a795b530 Author: Gabor Somogyi <gabor.g.somo...@gmail.com> AuthorDate: Thu Mar 7 11:36:37 2019 -0800 [SPARK-27022][DSTREAMS] Add kafka delegation token support. ## What changes were proposed in this pull request? It adds Kafka delegation token support for DStreams. Please be aware as Kafka native sink is not available for DStreams this PR contains delegation token usage only on consumer side. What this PR contains: * Usage of token through dynamic JAAS configuration * `KafkaConfigUpdater` moved to `kafka-0-10-token-provider` * `KafkaSecurityHelper` functionality moved into `KafkaTokenUtil` * Documentation ## How was this patch tested? Existing unit tests + on cluster. Long running Kafka to file tests on 4 node cluster with randomly thrown artificial exceptions. Test scenario: * 4 node cluster * Yarn * Kafka broker version 2.1.0 * security.protocol = SASL_SSL * sasl.mechanism = SCRAM-SHA-512 Kafka broker settings: * delegation.token.expiry.time.ms=600000 (10 min) * delegation.token.max.lifetime.ms=1200000 (20 min) * delegation.token.expiry.check.interval.ms=300000 (5 min) After each 7.5 minutes new delegation token obtained from Kafka broker (10 min * 0.75). When token expired after 10 minutes (Spark obtains new one and doesn't renew the old), the brokers expiring thread comes after each 5 minutes (invalidates expired tokens) and artificial exception has been thrown inside the Spark application (such case Spark closes connection), then the latest delegation token picked up correctly. cd docs/ SKIP_API=1 jekyll build Manual webpage check. Closes #23929 from gaborgsomogyi/SPARK-27022. Authored-by: Gabor Somogyi <gabor.g.somo...@gmail.com> Signed-off-by: Marcelo Vanzin <van...@cloudera.com> --- docs/streaming-kafka-0-10-integration.md | 7 +++ .../spark/sql/kafka010/CachedKafkaProducer.scala | 1 + .../spark/sql/kafka010/ConsumerStrategy.scala | 2 + .../spark/sql/kafka010/KafkaDataConsumer.scala | 1 + .../spark/sql/kafka010/KafkaSecurityHelper.scala | 53 ------------------- .../spark/sql/kafka010/KafkaSourceProvider.scala | 1 + .../sql/kafka010/KafkaSecurityHelperSuite.scala | 43 ---------------- external/kafka-0-10-token-provider/pom.xml | 5 ++ .../spark}/kafka010/KafkaConfigUpdater.scala | 9 ++-- .../org/apache/spark/kafka010/KafkaTokenUtil.scala | 30 ++++++++++- .../spark}/kafka010/KafkaConfigUpdaterSuite.scala | 2 +- .../spark}/kafka010/KafkaDelegationTokenTest.scala | 3 +- .../spark/kafka010/KafkaTokenUtilSuite.scala | 59 ++++++++-------------- external/kafka-0-10/pom.xml | 5 ++ .../streaming/kafka010/ConsumerStrategy.scala | 19 +++++-- .../streaming/kafka010/KafkaDataConsumer.scala | 8 ++- 16 files changed, 101 insertions(+), 147 deletions(-) diff --git a/docs/streaming-kafka-0-10-integration.md b/docs/streaming-kafka-0-10-integration.md index 975adacca..3fb6271 100644 --- a/docs/streaming-kafka-0-10-integration.md +++ b/docs/streaming-kafka-0-10-integration.md @@ -315,3 +315,10 @@ As with any Spark applications, `spark-submit` is used to launch your applicatio For Scala and Java applications, if you are using SBT or Maven for project management, then package `spark-streaming-kafka-0-10_{{site.SCALA_BINARY_VERSION}}` and its dependencies into the application JAR. Make sure `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided` dependencies as those are already present in a Spark installation. Then use `spark-submit` to launch your application (see [Deploying section](streaming-pro [...] +### Security + +See [Structured Streaming Security](structured-streaming-kafka-integration.html#security). + +##### Additional Caveats + +- Kafka native sink is not available so delegation token used only on consumer side. diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala index f24001f..062ce9a 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala @@ -28,6 +28,7 @@ import scala.util.control.NonFatal import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging +import org.apache.spark.kafka010.KafkaConfigUpdater private[kafka010] object CachedKafkaProducer extends Logging { diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala index dfdafce..2326619 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala @@ -25,6 +25,8 @@ import org.apache.kafka.clients.consumer.{Consumer, KafkaConsumer} import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener import org.apache.kafka.common.TopicPartition +import org.apache.spark.kafka010.KafkaConfigUpdater + /** * Subscribe allows you to subscribe to a fixed collection of topics. * SubscribePattern allows you to use a regex to specify topics of interest. diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala index a0255a1..83bf4b1 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala @@ -27,6 +27,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.spark.{SparkEnv, SparkException, TaskContext} import org.apache.spark.internal.Logging +import org.apache.spark.kafka010.KafkaConfigUpdater import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange import org.apache.spark.sql.kafka010.KafkaSourceProvider._ import org.apache.spark.util.UninterruptibleThread diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala deleted file mode 100644 index a11d54f..0000000 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.kafka010 - -import org.apache.hadoop.security.UserGroupInformation -import org.apache.kafka.common.security.scram.ScramLoginModule - -import org.apache.spark.SparkConf -import org.apache.spark.internal.Logging -import org.apache.spark.internal.config._ -import org.apache.spark.kafka010.KafkaTokenUtil - -private[kafka010] object KafkaSecurityHelper extends Logging { - def isTokenAvailable(): Boolean = { - UserGroupInformation.getCurrentUser().getCredentials.getToken( - KafkaTokenUtil.TOKEN_SERVICE) != null - } - - def getTokenJaasParams(sparkConf: SparkConf): String = { - val token = UserGroupInformation.getCurrentUser().getCredentials.getToken( - KafkaTokenUtil.TOKEN_SERVICE) - val username = new String(token.getIdentifier) - val password = new String(token.getPassword) - - val loginModuleName = classOf[ScramLoginModule].getName - val params = - s""" - |$loginModuleName required - | tokenauth=true - | serviceName="${sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)}" - | username="$username" - | password="$password"; - """.stripMargin.replace("\n", "") - logDebug(s"Scram JAAS params: ${params.replaceAll("password=\".*\"", "password=\"[hidden]\"")}") - - params - } -} diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 4dc6955..b39e0d4 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -27,6 +27,7 @@ import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer} import org.apache.spark.internal.Logging +import org.apache.spark.kafka010.KafkaConfigUpdater import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext} import org.apache.spark.sql.execution.streaming.{Sink, Source} import org.apache.spark.sql.sources._ diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala deleted file mode 100644 index d908bbf..0000000 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.kafka010 - -import org.apache.spark.{SparkConf, SparkFunSuite} - -class KafkaSecurityHelperSuite extends SparkFunSuite with KafkaDelegationTokenTest { - test("isTokenAvailable without token should return false") { - assert(!KafkaSecurityHelper.isTokenAvailable()) - } - - test("isTokenAvailable with token should return true") { - addTokenToUGI() - - assert(KafkaSecurityHelper.isTokenAvailable()) - } - - test("getTokenJaasParams with token should return scram module") { - addTokenToUGI() - - val jaasParams = KafkaSecurityHelper.getTokenJaasParams(new SparkConf()) - - assert(jaasParams.contains("ScramLoginModule required")) - assert(jaasParams.contains("tokenauth=true")) - assert(jaasParams.contains(tokenId)) - assert(jaasParams.contains(tokenPassword)) - } -} diff --git a/external/kafka-0-10-token-provider/pom.xml b/external/kafka-0-10-token-provider/pom.xml index b2abcd9..40ef1f7 100644 --- a/external/kafka-0-10-token-provider/pom.xml +++ b/external/kafka-0-10-token-provider/pom.xml @@ -53,6 +53,11 @@ <version>${kafka.version}</version> </dependency> <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-tags_${scala.binary.version}</artifactId> </dependency> diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdater.scala b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaConfigUpdater.scala similarity index 89% rename from external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdater.scala rename to external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaConfigUpdater.scala index 978dfe6..d24eb4a 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdater.scala +++ b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaConfigUpdater.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.kafka010 +package org.apache.spark.kafka010 import java.{util => ju} @@ -26,12 +26,11 @@ import org.apache.kafka.common.config.SaslConfigs import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging import org.apache.spark.internal.config.Kafka -import org.apache.spark.kafka010.KafkaTokenUtil /** * Class to conveniently update Kafka config params, while logging the changes */ -private[kafka010] case class KafkaConfigUpdater(module: String, kafkaParams: Map[String, Object]) +private[spark] case class KafkaConfigUpdater(module: String, kafkaParams: Map[String, Object]) extends Logging { private val map = new ju.HashMap[String, Object](kafkaParams.asJava) @@ -58,9 +57,9 @@ private[kafka010] case class KafkaConfigUpdater(module: String, kafkaParams: Map // configuration. if (KafkaTokenUtil.isGlobalJaasConfigurationProvided) { logDebug("JVM global security configuration detected, using it for login.") - } else if (KafkaSecurityHelper.isTokenAvailable()) { + } else if (KafkaTokenUtil.isTokenAvailable()) { logDebug("Delegation token detected, using it for login.") - val jaasParams = KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf) + val jaasParams = KafkaTokenUtil.getTokenJaasParams(SparkEnv.get.conf) set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams) val mechanism = SparkEnv.get.conf.get(Kafka.TOKEN_SASL_MECHANISM) require(mechanism.startsWith("SCRAM"), diff --git a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala index 574d58b..e5604f2 100644 --- a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala +++ b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala @@ -31,6 +31,7 @@ import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.security.JaasContext import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL} +import org.apache.kafka.common.security.scram.ScramLoginModule import org.apache.kafka.common.security.token.delegation.DelegationToken import org.apache.spark.SparkConf @@ -154,7 +155,7 @@ private[spark] object KafkaTokenUtil extends Logging { } } - private[kafka010] def getKeytabJaasParams(sparkConf: SparkConf): String = { + private def getKeytabJaasParams(sparkConf: SparkConf): String = { val params = s""" |${getKrb5LoginModuleName} required @@ -167,7 +168,7 @@ private[spark] object KafkaTokenUtil extends Logging { params } - def getTicketCacheJaasParams(sparkConf: SparkConf): String = { + private def getTicketCacheJaasParams(sparkConf: SparkConf): String = { val serviceName = sparkConf.get(Kafka.KERBEROS_SERVICE_NAME) require(serviceName.nonEmpty, "Kerberos service name must be defined") @@ -208,4 +209,29 @@ private[spark] object KafkaTokenUtil extends Logging { dateFormat.format(tokenInfo.maxTimestamp))) } } + + def isTokenAvailable(): Boolean = { + UserGroupInformation.getCurrentUser().getCredentials.getToken( + KafkaTokenUtil.TOKEN_SERVICE) != null + } + + def getTokenJaasParams(sparkConf: SparkConf): String = { + val token = UserGroupInformation.getCurrentUser().getCredentials.getToken( + KafkaTokenUtil.TOKEN_SERVICE) + val username = new String(token.getIdentifier) + val password = new String(token.getPassword) + + val loginModuleName = classOf[ScramLoginModule].getName + val params = + s""" + |$loginModuleName required + | tokenauth=true + | serviceName="${sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)}" + | username="$username" + | password="$password"; + """.stripMargin.replace("\n", "") + logDebug(s"Scram JAAS params: ${params.replaceAll("password=\".*\"", "password=\"[hidden]\"")}") + + params + } } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdaterSuite.scala b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaConfigUpdaterSuite.scala similarity index 98% rename from external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdaterSuite.scala rename to external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaConfigUpdaterSuite.scala index 25ccca3..538486b 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdaterSuite.scala +++ b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaConfigUpdaterSuite.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.kafka010 +package org.apache.spark.kafka010 import org.apache.kafka.common.config.SaslConfigs diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDelegationTokenTest.scala b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala similarity index 97% rename from external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDelegationTokenTest.scala rename to external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala index d0cefc4..bd9b873 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDelegationTokenTest.scala +++ b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.kafka010 +package org.apache.spark.kafka010 import java.{util => ju} import javax.security.auth.login.{AppConfigurationEntry, Configuration} @@ -26,7 +26,6 @@ import org.mockito.Mockito.mock import org.scalatest.BeforeAndAfterEach import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite} -import org.apache.spark.kafka010.KafkaTokenUtil import org.apache.spark.kafka010.KafkaTokenUtil.KafkaDelegationTokenIdentifier /** diff --git a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala index 5da6260..0a5af1d 100644 --- a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala +++ b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala @@ -17,20 +17,17 @@ package org.apache.spark.kafka010 -import java.{util => ju} import java.security.PrivilegedExceptionAction -import javax.security.auth.login.{AppConfigurationEntry, Configuration} import org.apache.hadoop.security.UserGroupInformation import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL} -import org.scalatest.BeforeAndAfterEach import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.internal.config._ -class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach { +class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest { private val bootStrapServers = "127.0.0.1:0" private val trustStoreLocation = "/path/to/trustStore" private val trustStorePassword = "trustStoreSecret" @@ -42,44 +39,11 @@ class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach { private var sparkConf: SparkConf = null - private class KafkaJaasConfiguration extends Configuration { - val entry = - new AppConfigurationEntry( - "DummyModule", - AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, - ju.Collections.emptyMap[String, Object]() - ) - - override def getAppConfigurationEntry(name: String): Array[AppConfigurationEntry] = { - if (name.equals("KafkaClient")) { - Array(entry) - } else { - null - } - } - } - override def beforeEach(): Unit = { super.beforeEach() sparkConf = new SparkConf() } - override def afterEach(): Unit = { - try { - resetGlobalConfig() - } finally { - super.afterEach() - } - } - - private def setGlobalKafkaClientConfig(): Unit = { - Configuration.setConfiguration(new KafkaJaasConfiguration) - } - - private def resetGlobalConfig(): Unit = { - Configuration.setConfiguration(null) - } - test("checkProxyUser with proxy current user should throw exception") { val realUser = UserGroupInformation.createUserForTesting("realUser", Array()) UserGroupInformation.createProxyUserForTesting("proxyUser", realUser, Array()).doAs( @@ -229,4 +193,25 @@ class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach { assert(KafkaTokenUtil.isGlobalJaasConfigurationProvided) } + + test("isTokenAvailable without token should return false") { + assert(!KafkaTokenUtil.isTokenAvailable()) + } + + test("isTokenAvailable with token should return true") { + addTokenToUGI() + + assert(KafkaTokenUtil.isTokenAvailable()) + } + + test("getTokenJaasParams with token should return scram module") { + addTokenToUGI() + + val jaasParams = KafkaTokenUtil.getTokenJaasParams(new SparkConf()) + + assert(jaasParams.contains("ScramLoginModule required")) + assert(jaasParams.contains("tokenauth=true")) + assert(jaasParams.contains(tokenId)) + assert(jaasParams.contains(tokenPassword)) + } } diff --git a/external/kafka-0-10/pom.xml b/external/kafka-0-10/pom.xml index 333572e..f78bdac 100644 --- a/external/kafka-0-10/pom.xml +++ b/external/kafka-0-10/pom.xml @@ -36,6 +36,11 @@ <dependencies> <dependency> <groupId>org.apache.spark</groupId> + <artifactId>spark-token-provider-kafka-0-10_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.binary.version}</artifactId> <version>${project.version}</version> <scope>provided</scope> diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala index 07960d1..3e32b59 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala @@ -27,6 +27,7 @@ import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener import org.apache.kafka.common.TopicPartition import org.apache.spark.internal.Logging +import org.apache.spark.kafka010.KafkaConfigUpdater /** * Choice of how to create and configure underlying Kafka Consumers on driver and executors. @@ -54,6 +55,15 @@ abstract class ConsumerStrategy[K, V] { * checkpoint. */ def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] + + /** + * Updates the parameters with security if needed. + * Added a function to hide internals and reduce code duplications because all strategy uses it. + */ + protected def setAuthenticationConfigIfNeeded(kafkaParams: ju.Map[String, Object]) = + KafkaConfigUpdater("source", kafkaParams.asScala.toMap) + .setAuthenticationConfigIfNeeded() + .build() } /** @@ -78,7 +88,8 @@ private case class Subscribe[K, V]( def executorKafkaParams: ju.Map[String, Object] = kafkaParams def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = { - val consumer = new KafkaConsumer[K, V](kafkaParams) + val updatedKafkaParams = setAuthenticationConfigIfNeeded(kafkaParams) + val consumer = new KafkaConsumer[K, V](updatedKafkaParams) consumer.subscribe(topics) val toSeek = if (currentOffsets.isEmpty) { offsets @@ -134,7 +145,8 @@ private case class SubscribePattern[K, V]( def executorKafkaParams: ju.Map[String, Object] = kafkaParams def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = { - val consumer = new KafkaConsumer[K, V](kafkaParams) + val updatedKafkaParams = setAuthenticationConfigIfNeeded(kafkaParams) + val consumer = new KafkaConsumer[K, V](updatedKafkaParams) consumer.subscribe(pattern, new NoOpConsumerRebalanceListener()) val toSeek = if (currentOffsets.isEmpty) { offsets @@ -186,7 +198,8 @@ private case class Assign[K, V]( def executorKafkaParams: ju.Map[String, Object] = kafkaParams def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = { - val consumer = new KafkaConsumer[K, V](kafkaParams) + val updatedKafkaParams = setAuthenticationConfigIfNeeded(kafkaParams) + val consumer = new KafkaConsumer[K, V](updatedKafkaParams) consumer.assign(topicPartitions) val toSeek = if (currentOffsets.isEmpty) { offsets diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala index 68c5fe9..142e946 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala @@ -19,11 +19,14 @@ package org.apache.spark.streaming.kafka010 import java.{util => ju} +import scala.collection.JavaConverters._ + import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer} import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.spark.TaskContext import org.apache.spark.internal.Logging +import org.apache.spark.kafka010.KafkaConfigUpdater private[kafka010] sealed trait KafkaDataConsumer[K, V] { /** @@ -109,7 +112,10 @@ private[kafka010] class InternalKafkaConsumer[K, V]( /** Create a KafkaConsumer to fetch records for `topicPartition` */ private def createConsumer: KafkaConsumer[K, V] = { - val c = new KafkaConsumer[K, V](kafkaParams) + val updatedKafkaParams = KafkaConfigUpdater("executor", kafkaParams.asScala.toMap) + .setAuthenticationConfigIfNeeded() + .build() + val c = new KafkaConsumer[K, V](updatedKafkaParams) val topics = ju.Arrays.asList(topicPartition) c.assign(topics) c --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org