This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 2f55809 [SPARK-27294][SS] Add multi-cluster Kafka delegation token 2f55809 is described below commit 2f558094257c38d26650049f2ac93be6d65d6d85 Author: Gabor Somogyi <gabor.g.somo...@gmail.com> AuthorDate: Tue May 7 11:40:43 2019 -0700 [SPARK-27294][SS] Add multi-cluster Kafka delegation token ## What changes were proposed in this pull request? The actual implementation doesn't support multi-cluster Kafka connection with delegation token. In this PR I've added this functionality. What this PR contains: * New way of configuration * Multiple delegation token obtain/store/use functionality * Documentation * The change works on DStreams also ## How was this patch tested? Existing + additional unit tests. Additionally tested on cluster. Test scenario: * 2 * 4 node clusters * The 4-4 nodes are in different kerberos realms * Cross-Realm trust between the 2 realms * Yarn * Kafka broker version 2.1.0 * security.protocol = SASL_SSL * sasl.mechanism = SCRAM-SHA-512 * Artificial exceptions during processing * Source reads from realm1 sink writes to realm2 Kafka broker settings: * delegation.token.expiry.time.ms=600000 (10 min) * delegation.token.max.lifetime.ms=1200000 (20 min) * delegation.token.expiry.check.interval.ms=300000 (5 min) Closes #24305 from gaborgsomogyi/SPARK-27294. Authored-by: Gabor Somogyi <gabor.g.somo...@gmail.com> Signed-off-by: Marcelo Vanzin <van...@cloudera.com> --- .../org/apache/spark/internal/config/Kafka.scala | 91 --------------- docs/structured-streaming-kafka-integration.md | 103 ++++++++++++++++- .../spark/sql/kafka010/CachedKafkaProducer.scala | 14 +-- .../spark/sql/kafka010/ConsumerStrategy.scala | 10 +- .../spark/sql/kafka010/KafkaDataConsumer.scala | 2 +- .../apache/spark/kafka010/KafkaConfigUpdater.scala | 21 ++-- .../kafka010/KafkaDelegationTokenProvider.scala | 47 ++++++-- .../spark/kafka010/KafkaTokenSparkConf.scala | 96 ++++++++++++++++ .../org/apache/spark/kafka010/KafkaTokenUtil.scala | 102 ++++++++++------ .../spark/kafka010/KafkaConfigUpdaterSuite.scala | 45 ++++++-- .../spark/kafka010/KafkaDelegationTokenTest.scala | 7 +- .../spark/kafka010/KafkaTokenSparkConfSuite.scala | 128 +++++++++++++++++++++ .../spark/kafka010/KafkaTokenUtilSuite.scala | 123 ++++++++++++-------- external/kafka-0-10/pom.xml | 5 + .../kafka010/KafkaDataConsumerSuite.scala | 10 +- 15 files changed, 573 insertions(+), 231 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala b/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala deleted file mode 100644 index e91ddd3..0000000 --- a/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.internal.config - -private[spark] object Kafka { - - val BOOTSTRAP_SERVERS = - ConfigBuilder("spark.kafka.bootstrap.servers") - .doc("A list of coma separated host/port pairs to use for establishing the initial " + - "connection to the Kafka cluster. For further details please see kafka documentation. " + - "Only used to obtain delegation token.") - .stringConf - .createOptional - - val SECURITY_PROTOCOL = - ConfigBuilder("spark.kafka.security.protocol") - .doc("Protocol used to communicate with brokers. For further details please see kafka " + - "documentation. Only used to obtain delegation token.") - .stringConf - .createWithDefault("SASL_SSL") - - val KERBEROS_SERVICE_NAME = - ConfigBuilder("spark.kafka.sasl.kerberos.service.name") - .doc("The Kerberos principal name that Kafka runs as. This can be defined either in " + - "Kafka's JAAS config or in Kafka's config. For further details please see kafka " + - "documentation. Only used to obtain delegation token.") - .stringConf - .createWithDefault("kafka") - - val TRUSTSTORE_LOCATION = - ConfigBuilder("spark.kafka.ssl.truststore.location") - .doc("The location of the trust store file. For further details please see kafka " + - "documentation. Only used to obtain delegation token.") - .stringConf - .createOptional - - val TRUSTSTORE_PASSWORD = - ConfigBuilder("spark.kafka.ssl.truststore.password") - .doc("The store password for the trust store file. This is optional for client and only " + - "needed if ssl.truststore.location is configured. For further details please see kafka " + - "documentation. Only used to obtain delegation token.") - .stringConf - .createOptional - - val KEYSTORE_LOCATION = - ConfigBuilder("spark.kafka.ssl.keystore.location") - .doc("The location of the key store file. This is optional for client and can be used for " + - "two-way authentication for client. For further details please see kafka documentation. " + - "Only used to obtain delegation token.") - .stringConf - .createOptional - - val KEYSTORE_PASSWORD = - ConfigBuilder("spark.kafka.ssl.keystore.password") - .doc("The store password for the key store file. This is optional for client and only " + - "needed if ssl.keystore.location is configured. For further details please see kafka " + - "documentation. Only used to obtain delegation token.") - .stringConf - .createOptional - - val KEY_PASSWORD = - ConfigBuilder("spark.kafka.ssl.key.password") - .doc("The password of the private key in the key store file. This is optional for client. " + - "For further details please see kafka documentation. Only used to obtain delegation token.") - .stringConf - .createOptional - - val TOKEN_SASL_MECHANISM = - ConfigBuilder("spark.kafka.sasl.token.mechanism") - .doc("SASL mechanism used for client connections with delegation token. Because SCRAM " + - "login module used for authentication a compatible mechanism has to be set here. " + - "For further details please see kafka documentation (sasl.mechanism). Only used to " + - "authenticate against Kafka broker with delegation token.") - .stringConf - .createWithDefault("SCRAM-SHA-512") -} diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index 9e1bbc0..4a295e0 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -676,7 +676,7 @@ This way the application can be configured via Spark parameters and may not need configuration (Spark can use Kafka's dynamic JAAS configuration feature). For further information about delegation tokens, see [Kafka delegation token docs](http://kafka.apache.org/documentation/#security_delegation_token). -The process is initiated by Spark's Kafka delegation token provider. When `spark.kafka.bootstrap.servers` is set, +The process is initiated by Spark's Kafka delegation token provider. When `spark.kafka.clusters.${cluster}.auth.bootstrap.servers` is set, Spark considers the following log in options, in order of preference: - **JAAS login configuration**, please see example below. - **Keytab file**, such as, @@ -684,13 +684,13 @@ Spark considers the following log in options, in order of preference: ./bin/spark-submit \ --keytab <KEYTAB_FILE> \ --principal <PRINCIPAL> \ - --conf spark.kafka.bootstrap.servers=<KAFKA_SERVERS> \ + --conf spark.kafka.clusters.${cluster}.auth.bootstrap.servers=<KAFKA_SERVERS> \ ... - **Kerberos credential cache**, such as, ./bin/spark-submit \ - --conf spark.kafka.bootstrap.servers=<KAFKA_SERVERS> \ + --conf spark.kafka.clusters.${cluster}.auth.bootstrap.servers=<KAFKA_SERVERS> \ ... The Kafka delegation token provider can be turned off by setting `spark.security.credentials.kafka.enabled` to `false` (default: `true`). @@ -703,10 +703,103 @@ Kafka broker configuration): After obtaining delegation token successfully, Spark distributes it across nodes and renews it accordingly. Delegation token uses `SCRAM` login module for authentication and because of that the appropriate -`spark.kafka.sasl.token.mechanism` (default: `SCRAM-SHA-512`) has to be configured. Also, this parameter +`spark.kafka.clusters.${cluster}.sasl.token.mechanism` (default: `SCRAM-SHA-512`) has to be configured. Also, this parameter must match with Kafka broker configuration. -When delegation token is available on an executor it can be overridden with JAAS login configuration. +When delegation token is available on an executor Spark considers the following log in options, in order of preference: +- **JAAS login configuration**, please see example below. +- **Delegation token**, please see <code>spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex</code> parameter for further details. + +When none of the above applies then unsecure connection assumed. + + +#### Configuration + +Delegation tokens can be obtained from multiple clusters and <code>${cluster}</code> is an arbitrary unique identifier which helps to group different configurations. + +<table class="table"> +<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr> + <tr> + <td><code>spark.kafka.clusters.${cluster}.auth.bootstrap.servers</code></td> + <td>None</td> + <td> + A list of coma separated host/port pairs to use for establishing the initial connection + to the Kafka cluster. For further details please see Kafka documentation. Only used to obtain delegation token. + </td> + </tr> + <tr> + <td><code>spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex</code></td> + <td>.*</td> + <td> + Regular expression to match against the <code>bootstrap.servers</code> config for sources and sinks in the application. + If a server address matches this regex, the delegation token obtained from the respective bootstrap servers will be used when connecting. + If multiple clusters match the address, an exception will be thrown and the query won't be started. + Kafka's secure and unsecure listeners are bound to different ports. When both used the secure listener port has to be part of the regular expression. + </td> + </tr> + <tr> + <td><code>spark.kafka.clusters.${cluster}.security.protocol</code></td> + <td>SASL_SSL</td> + <td> + Protocol used to communicate with brokers. For further details please see Kafka documentation. Only used to obtain delegation token. + </td> + </tr> + <tr> + <td><code>spark.kafka.clusters.${cluster}.sasl.kerberos.service.name</code></td> + <td>kafka</td> + <td> + The Kerberos principal name that Kafka runs as. This can be defined either in Kafka's JAAS config or in Kafka's config. + For further details please see Kafka documentation. Only used to obtain delegation token. + </td> + </tr> + <tr> + <td><code>spark.kafka.clusters.${cluster}.ssl.truststore.location</code></td> + <td>None</td> + <td> + The location of the trust store file. For further details please see Kafka documentation. Only used to obtain delegation token. + </td> + </tr> + <tr> + <td><code>spark.kafka.clusters.${cluster}.ssl.truststore.password</code></td> + <td>None</td> + <td> + The store password for the trust store file. This is optional and only needed if <code>spark.kafka.clusters.${cluster}.ssl.truststore.location</code> is configured. + For further details please see Kafka documentation. Only used to obtain delegation token. + </td> + </tr> + <tr> + <td><code>spark.kafka.clusters.${cluster}.ssl.keystore.location</code></td> + <td>None</td> + <td> + The location of the key store file. This is optional for client and can be used for two-way authentication for client. + For further details please see Kafka documentation. Only used to obtain delegation token. + </td> + </tr> + <tr> + <td><code>spark.kafka.clusters.${cluster}.ssl.keystore.password</code></td> + <td>None</td> + <td> + The store password for the key store file. This is optional and only needed if <code>spark.kafka.clusters.${cluster}.ssl.keystore.location</code> is configured. + For further details please see Kafka documentation. Only used to obtain delegation token. + </td> + </tr> + <tr> + <td><code>spark.kafka.clusters.${cluster}.ssl.key.password</code></td> + <td>None</td> + <td> + The password of the private key in the key store file. This is optional for client. + For further details please see Kafka documentation. Only used to obtain delegation token. + </td> + </tr> + <tr> + <td><code>spark.kafka.clusters.${cluster}.sasl.token.mechanism</code></td> + <td>SCRAM-SHA-512</td> + <td> + SASL mechanism used for client connections with delegation token. Because SCRAM login module used for authentication a compatible mechanism has to be set here. + For further details please see Kafka documentation (<code>sasl.mechanism</code>). Only used to authenticate against Kafka broker with delegation token. + </td> + </tr> +</table> #### Caveats diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala index 062ce9a..2bab287 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala @@ -65,12 +65,8 @@ private[kafka010] object CachedKafkaProducer extends Logging { .build[Seq[(String, Object)], Producer](cacheLoader) private def createKafkaProducer(producerConfiguration: ju.Map[String, Object]): Producer = { - val updatedKafkaProducerConfiguration = - KafkaConfigUpdater("executor", producerConfiguration.asScala.toMap) - .setAuthenticationConfigIfNeeded() - .build() - val kafkaProducer: Producer = new Producer(updatedKafkaProducerConfiguration) - logDebug(s"Created a new instance of KafkaProducer for $updatedKafkaProducerConfiguration.") + val kafkaProducer: Producer = new Producer(producerConfiguration) + logDebug(s"Created a new instance of KafkaProducer for $producerConfiguration.") kafkaProducer } @@ -80,7 +76,11 @@ private[kafka010] object CachedKafkaProducer extends Logging { * one instance per specified kafkaParams. */ private[kafka010] def getOrCreate(kafkaParams: ju.Map[String, Object]): Producer = { - val paramsSeq: Seq[(String, Object)] = paramsToSeq(kafkaParams) + val updatedKafkaProducerConfiguration = + KafkaConfigUpdater("executor", kafkaParams.asScala.toMap) + .setAuthenticationConfigIfNeeded() + .build() + val paramsSeq: Seq[(String, Object)] = paramsToSeq(updatedKafkaProducerConfiguration) try { guavaCache.get(paramsSeq) } catch { diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala index 2326619..7bb829c 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala @@ -36,7 +36,7 @@ import org.apache.spark.kafka010.KafkaConfigUpdater * All three strategies have overloaded constructors that allow you to specify * the starting offset for a particular partition. */ -sealed trait ConsumerStrategy { +private[kafka010] sealed trait ConsumerStrategy { /** Create a [[KafkaConsumer]] and subscribe to topics according to a desired strategy */ def createConsumer(kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]] @@ -53,7 +53,8 @@ sealed trait ConsumerStrategy { /** * Specify a fixed collection of partitions. */ -case class AssignStrategy(partitions: Array[TopicPartition]) extends ConsumerStrategy { +private[kafka010] case class AssignStrategy(partitions: Array[TopicPartition]) + extends ConsumerStrategy { override def createConsumer( kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]] = { val updatedKafkaParams = setAuthenticationConfigIfNeeded(kafkaParams) @@ -68,7 +69,7 @@ case class AssignStrategy(partitions: Array[TopicPartition]) extends ConsumerStr /** * Subscribe to a fixed collection of topics. */ -case class SubscribeStrategy(topics: Seq[String]) extends ConsumerStrategy { +private[kafka010] case class SubscribeStrategy(topics: Seq[String]) extends ConsumerStrategy { override def createConsumer( kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]] = { val updatedKafkaParams = setAuthenticationConfigIfNeeded(kafkaParams) @@ -83,7 +84,8 @@ case class SubscribeStrategy(topics: Seq[String]) extends ConsumerStrategy { /** * Use a regex to specify topics of interest. */ -case class SubscribePatternStrategy(topicPattern: String) extends ConsumerStrategy { +private[kafka010] case class SubscribePatternStrategy(topicPattern: String) + extends ConsumerStrategy { override def createConsumer( kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]] = { val updatedKafkaParams = setAuthenticationConfigIfNeeded(kafkaParams) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala index 83bf4b1..45ea3d2 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala @@ -347,7 +347,7 @@ private[kafka010] case class InternalKafkaConsumer( * consumer's `isolation.level` is `read_committed`), it will return a `FetchedRecord` with the * next offset to fetch. * - * This method also will try the best to detect data loss. If `failOnDataLoss` is true`, it will + * This method also will try the best to detect data loss. If `failOnDataLoss` is `true`, it will * throw an exception when we detect an unavailable offset. If `failOnDataLoss` is `false`, this * method will return `null` if the next available record is within [offset, untilOffset). * diff --git a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaConfigUpdater.scala b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaConfigUpdater.scala index d24eb4a..38f3b98 100644 --- a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaConfigUpdater.scala +++ b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaConfigUpdater.scala @@ -21,11 +21,11 @@ import java.{util => ju} import scala.collection.JavaConverters._ +import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.common.config.SaslConfigs import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.Kafka /** * Class to conveniently update Kafka config params, while logging the changes @@ -57,14 +57,17 @@ private[spark] case class KafkaConfigUpdater(module: String, kafkaParams: Map[St // configuration. if (KafkaTokenUtil.isGlobalJaasConfigurationProvided) { logDebug("JVM global security configuration detected, using it for login.") - } else if (KafkaTokenUtil.isTokenAvailable()) { - logDebug("Delegation token detected, using it for login.") - val jaasParams = KafkaTokenUtil.getTokenJaasParams(SparkEnv.get.conf) - set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams) - val mechanism = SparkEnv.get.conf.get(Kafka.TOKEN_SASL_MECHANISM) - require(mechanism.startsWith("SCRAM"), - "Delegation token works only with SCRAM mechanism.") - set(SaslConfigs.SASL_MECHANISM, mechanism) + } else { + val clusterConfig = KafkaTokenUtil.findMatchingToken(SparkEnv.get.conf, + map.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG).asInstanceOf[String]) + clusterConfig.foreach { clusterConf => + logDebug("Delegation token detected, using it for login.") + val jaasParams = KafkaTokenUtil.getTokenJaasParams(clusterConf) + set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams) + require(clusterConf.tokenMechanism.startsWith("SCRAM"), + "Delegation token works only with SCRAM mechanism.") + set(SaslConfigs.SASL_MECHANISM, clusterConf.tokenMechanism) + } } this } diff --git a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala index da12e51..69fcf55 100644 --- a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala +++ b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala @@ -25,7 +25,6 @@ import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, S import org.apache.spark.SparkConf import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.Kafka import org.apache.spark.security.HadoopDelegationTokenProvider private[spark] class KafkaDelegationTokenProvider @@ -37,25 +36,49 @@ private[spark] class KafkaDelegationTokenProvider hadoopConf: Configuration, sparkConf: SparkConf, creds: Credentials): Option[Long] = { + var lowestNextRenewalDate: Option[Long] = None try { - logDebug("Attempting to fetch Kafka security token.") - val (token, nextRenewalDate) = KafkaTokenUtil.obtainToken(sparkConf) - creds.addToken(token.getService, token) - return Some(nextRenewalDate) + KafkaTokenSparkConf.getAllClusterConfigs(sparkConf).foreach { clusterConf => + try { + if (delegationTokensRequired(clusterConf)) { + logDebug( + s"Attempting to fetch Kafka security token for cluster ${clusterConf.identifier}.") + val (token, nextRenewalDate) = KafkaTokenUtil.obtainToken(sparkConf, clusterConf) + creds.addToken(token.getService, token) + if (lowestNextRenewalDate.isEmpty || nextRenewalDate < lowestNextRenewalDate.get) { + lowestNextRenewalDate = Some(nextRenewalDate) + } + } else { + logDebug( + s"Cluster ${clusterConf.identifier} does not require delegation token, skipping.") + } + } catch { + case NonFatal(e) => + logWarning(s"Failed to get token from service: $serviceName " + + s"cluster: ${clusterConf.identifier}", e) + } + } } catch { case NonFatal(e) => - logWarning(s"Failed to get token from service $serviceName", e) + logWarning(s"Failed to get token cluster configuration", e) } - None + lowestNextRenewalDate } override def delegationTokensRequired( sparkConf: SparkConf, hadoopConf: Configuration): Boolean = { - val protocol = sparkConf.get(Kafka.SECURITY_PROTOCOL) - sparkConf.contains(Kafka.BOOTSTRAP_SERVERS) && - (protocol == SASL_SSL.name || - protocol == SSL.name || - protocol == SASL_PLAINTEXT.name) + try { + KafkaTokenSparkConf.getAllClusterConfigs(sparkConf).exists(delegationTokensRequired(_)) + } catch { + case NonFatal(e) => + logWarning(s"Failed to get token cluster configuration", e) + false + } } + + private def delegationTokensRequired(clusterConf: KafkaTokenClusterConf): Boolean = + clusterConf.securityProtocol == SASL_SSL.name || + clusterConf.securityProtocol == SSL.name || + clusterConf.securityProtocol == SASL_PLAINTEXT.name } diff --git a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenSparkConf.scala b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenSparkConf.scala new file mode 100644 index 0000000..84d58d8 --- /dev/null +++ b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenSparkConf.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kafka010 + +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.common.config.{SaslConfigs, SslConfigs} +import org.apache.kafka.common.security.auth.SecurityProtocol.SASL_SSL + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + +private[spark] case class KafkaTokenClusterConf( + identifier: String, + authBootstrapServers: String, + targetServersRegex: String, + securityProtocol: String, + kerberosServiceName: String, + trustStoreLocation: Option[String], + trustStorePassword: Option[String], + keyStoreLocation: Option[String], + keyStorePassword: Option[String], + keyPassword: Option[String], + tokenMechanism: String) { + override def toString: String = s"KafkaTokenClusterConf{" + + s"identifier=$identifier, " + + s"authBootstrapServers=$authBootstrapServers, " + + s"targetServersRegex=$targetServersRegex, " + + s"securityProtocol=$securityProtocol, " + + s"kerberosServiceName=$kerberosServiceName, " + + s"trustStoreLocation=$trustStoreLocation, " + + s"trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " + + s"keyStoreLocation=$keyStoreLocation, " + + s"keyStorePassword=${keyStorePassword.map(_ => "xxx")}, " + + s"keyPassword=${keyPassword.map(_ => "xxx")}, " + + s"tokenMechanism=$tokenMechanism}" +} + +private [kafka010] object KafkaTokenSparkConf extends Logging { + val CLUSTERS_CONFIG_PREFIX = "spark.kafka.clusters." + val DEFAULT_TARGET_SERVERS_REGEX = ".*" + val DEFAULT_SASL_KERBEROS_SERVICE_NAME = "kafka" + val DEFAULT_SASL_TOKEN_MECHANISM = "SCRAM-SHA-512" + + def getClusterConfig(sparkConf: SparkConf, identifier: String): KafkaTokenClusterConf = { + val configPrefix = s"$CLUSTERS_CONFIG_PREFIX$identifier." + val sparkClusterConf = sparkConf.getAllWithPrefix(configPrefix).toMap + val result = KafkaTokenClusterConf( + identifier, + sparkClusterConf + .getOrElse(s"auth.${CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG}", + throw new NoSuchElementException( + s"${configPrefix}auth.${CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG}")), + sparkClusterConf.getOrElse(s"target.${CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG}.regex", + KafkaTokenSparkConf.DEFAULT_TARGET_SERVERS_REGEX), + sparkClusterConf.getOrElse(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SASL_SSL.name), + sparkClusterConf.getOrElse(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, + KafkaTokenSparkConf.DEFAULT_SASL_KERBEROS_SERVICE_NAME), + sparkClusterConf.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG), + sparkClusterConf.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG), + sparkClusterConf.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG), + sparkClusterConf.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG), + sparkClusterConf.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG), + sparkClusterConf.getOrElse("sasl.token.mechanism", + KafkaTokenSparkConf.DEFAULT_SASL_TOKEN_MECHANISM) + ) + logDebug(s"getClusterConfig($identifier): $result") + result + } + + def getAllClusterConfigs(sparkConf: SparkConf): Set[KafkaTokenClusterConf] = { + sparkConf.getAllWithPrefix(KafkaTokenSparkConf.CLUSTERS_CONFIG_PREFIX).toMap.keySet + .flatMap { k => + val split = k.split('.') + if (split.length > 0 && split(0).nonEmpty) { + Some(split(0)) + } else { + None + } + }.map(getClusterConfig(sparkConf, _)) + } +} diff --git a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala index 3f9a593..7078b4f 100644 --- a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala +++ b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala @@ -19,7 +19,9 @@ package org.apache.spark.kafka010 import java.{util => ju} import java.text.SimpleDateFormat +import java.util.regex.Pattern +import scala.collection.JavaConverters._ import scala.util.control.NonFatal import org.apache.hadoop.io.Text @@ -38,20 +40,28 @@ import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ +import org.apache.spark.util.Utils private[spark] object KafkaTokenUtil extends Logging { val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") - val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + private val TOKEN_SERVICE_PREFIX = "kafka.server.delegation.token" + + private[kafka010] def getTokenService(identifier: String): Text = + new Text(s"$TOKEN_SERVICE_PREFIX.$identifier") + + private def getClusterIdentifier(service: Text): String = + service.toString().replace(s"$TOKEN_SERVICE_PREFIX.", "") private[spark] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { override def getKind: Text = TOKEN_KIND } - private[kafka010] def obtainToken(sparkConf: SparkConf): - (Token[KafkaDelegationTokenIdentifier], Long) = { + private[kafka010] def obtainToken( + sparkConf: SparkConf, + clusterConf: KafkaTokenClusterConf): (Token[KafkaDelegationTokenIdentifier], Long) = { checkProxyUser() - val adminClient = AdminClient.create(createAdminClientProperties(sparkConf)) + val adminClient = AdminClient.create(createAdminClientProperties(sparkConf, clusterConf)) val createDelegationTokenOptions = new CreateDelegationTokenOptions() val createResult = adminClient.createDelegationToken(createDelegationTokenOptions) val token = createResult.delegationToken().get() @@ -61,7 +71,7 @@ private[spark] object KafkaTokenUtil extends Logging { token.tokenInfo.tokenId.getBytes, token.hmacAsBase64String.getBytes, TOKEN_KIND, - TOKEN_SERVICE + getTokenService(clusterConf.identifier) ), token.tokenInfo.expiryTimestamp) } @@ -73,23 +83,23 @@ private[spark] object KafkaTokenUtil extends Logging { "user is not yet supported.") } - private[kafka010] def createAdminClientProperties(sparkConf: SparkConf): ju.Properties = { + private[kafka010] def createAdminClientProperties( + sparkConf: SparkConf, + clusterConf: KafkaTokenClusterConf): ju.Properties = { val adminClientProperties = new ju.Properties - val bootstrapServers = sparkConf.get(Kafka.BOOTSTRAP_SERVERS) - require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + - "servers not configured.") - adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, + clusterConf.authBootstrapServers) - val protocol = sparkConf.get(Kafka.SECURITY_PROTOCOL) - adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) - protocol match { + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, + clusterConf.securityProtocol) + clusterConf.securityProtocol match { case SASL_SSL.name => - setTrustStoreProperties(sparkConf, adminClientProperties) + setTrustStoreProperties(clusterConf, adminClientProperties) case SSL.name => - setTrustStoreProperties(sparkConf, adminClientProperties) - setKeyStoreProperties(sparkConf, adminClientProperties) + setTrustStoreProperties(clusterConf, adminClientProperties) + setKeyStoreProperties(clusterConf, adminClientProperties) logWarning("Obtaining kafka delegation token with SSL protocol. Please " + "configure 2-way authentication on the broker side.") @@ -114,11 +124,11 @@ private[spark] object KafkaTokenUtil extends Logging { adminClientProperties.put(SaslConfigs.SASL_MECHANISM, SaslConfigs.GSSAPI_MECHANISM) if (sparkConf.contains(KEYTAB)) { logDebug("Keytab detected, using it for login.") - val jaasParams = getKeytabJaasParams(sparkConf) + val jaasParams = getKeytabJaasParams(sparkConf, clusterConf) adminClientProperties.put(SaslConfigs.SASL_JAAS_CONFIG, jaasParams) } else { logDebug("Using ticket cache for login.") - val jaasParams = getTicketCacheJaasParams(sparkConf) + val jaasParams = getTicketCacheJaasParams(clusterConf) adminClientProperties.put(SaslConfigs.SASL_JAAS_CONFIG, jaasParams) } } @@ -135,34 +145,40 @@ private[spark] object KafkaTokenUtil extends Logging { } } - private def setTrustStoreProperties(sparkConf: SparkConf, properties: ju.Properties): Unit = { - sparkConf.get(Kafka.TRUSTSTORE_LOCATION).foreach { truststoreLocation => + private def setTrustStoreProperties( + clusterConf: KafkaTokenClusterConf, + properties: ju.Properties): Unit = { + clusterConf.trustStoreLocation.foreach { truststoreLocation => properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreLocation) } - sparkConf.get(Kafka.TRUSTSTORE_PASSWORD).foreach { truststorePassword => + clusterConf.trustStorePassword.foreach { truststorePassword => properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststorePassword) } } - private def setKeyStoreProperties(sparkConf: SparkConf, properties: ju.Properties): Unit = { - sparkConf.get(Kafka.KEYSTORE_LOCATION).foreach { keystoreLocation => + private def setKeyStoreProperties( + clusterConf: KafkaTokenClusterConf, + properties: ju.Properties): Unit = { + clusterConf.keyStoreLocation.foreach { keystoreLocation => properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keystoreLocation) } - sparkConf.get(Kafka.KEYSTORE_PASSWORD).foreach { keystorePassword => + clusterConf.keyStorePassword.foreach { keystorePassword => properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keystorePassword) } - sparkConf.get(Kafka.KEY_PASSWORD).foreach { keyPassword => + clusterConf.keyPassword.foreach { keyPassword => properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPassword) } } - private def getKeytabJaasParams(sparkConf: SparkConf): String = { + private def getKeytabJaasParams( + sparkConf: SparkConf, + clusterConf: KafkaTokenClusterConf): String = { val params = s""" |${getKrb5LoginModuleName} required | debug=${isGlobalKrbDebugEnabled()} | useKeyTab=true - | serviceName="${sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)}" + | serviceName="${clusterConf.kerberosServiceName}" | keyTab="${sparkConf.get(KEYTAB).get}" | principal="${sparkConf.get(PRINCIPAL).get}"; """.stripMargin.replace("\n", "") @@ -170,16 +186,13 @@ private[spark] object KafkaTokenUtil extends Logging { params } - private def getTicketCacheJaasParams(sparkConf: SparkConf): String = { - val serviceName = sparkConf.get(Kafka.KERBEROS_SERVICE_NAME) - require(serviceName.nonEmpty, "Kerberos service name must be defined") - + private def getTicketCacheJaasParams(clusterConf: KafkaTokenClusterConf): String = { val params = s""" |${getKrb5LoginModuleName} required | debug=${isGlobalKrbDebugEnabled()} | useTicketCache=true - | serviceName="${sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)}"; + | serviceName="${clusterConf.kerberosServiceName}"; """.stripMargin.replace("\n", "") logDebug(s"Krb ticket cache JAAS params: $params") params @@ -223,14 +236,27 @@ private[spark] object KafkaTokenUtil extends Logging { } } - def isTokenAvailable(): Boolean = { - UserGroupInformation.getCurrentUser().getCredentials.getToken( - KafkaTokenUtil.TOKEN_SERVICE) != null + def findMatchingToken( + sparkConf: SparkConf, + bootStrapServers: String): Option[KafkaTokenClusterConf] = { + val tokens = UserGroupInformation.getCurrentUser().getCredentials.getAllTokens.asScala + val clusterConfigs = tokens + .filter(_.getService().toString().startsWith(TOKEN_SERVICE_PREFIX)) + .map { token => + KafkaTokenSparkConf.getClusterConfig(sparkConf, getClusterIdentifier(token.getService())) + } + .filter { clusterConfig => + val pattern = Pattern.compile(clusterConfig.targetServersRegex) + Utils.stringToSeq(bootStrapServers).exists(pattern.matcher(_).matches()) + } + require(clusterConfigs.size <= 1, "More than one delegation token matches the following " + + s"bootstrap servers: $bootStrapServers.") + clusterConfigs.headOption } - def getTokenJaasParams(sparkConf: SparkConf): String = { + def getTokenJaasParams(clusterConf: KafkaTokenClusterConf): String = { val token = UserGroupInformation.getCurrentUser().getCredentials.getToken( - KafkaTokenUtil.TOKEN_SERVICE) + getTokenService(clusterConf.identifier)) val username = new String(token.getIdentifier) val password = new String(token.getPassword) @@ -239,7 +265,7 @@ private[spark] object KafkaTokenUtil extends Logging { s""" |$loginModuleName required | tokenauth=true - | serviceName="${sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)}" + | serviceName="${clusterConf.kerberosServiceName}" | username="$username" | password="$password"; """.stripMargin.replace("\n", "") diff --git a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaConfigUpdaterSuite.scala b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaConfigUpdaterSuite.scala index 538486b..8f4cedf 100644 --- a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaConfigUpdaterSuite.scala +++ b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaConfigUpdaterSuite.scala @@ -17,16 +17,19 @@ package org.apache.spark.kafka010 +import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.common.config.SaslConfigs import org.apache.spark.SparkFunSuite -import org.apache.spark.internal.config._ class KafkaConfigUpdaterSuite extends SparkFunSuite with KafkaDelegationTokenTest { + private val identifier = "cluster1" + private val tokenService = KafkaTokenUtil.getTokenService(identifier) private val testModule = "testModule" private val testKey = "testKey" private val testValue = "testValue" private val otherTestValue = "otherTestValue" + private val bootStrapServers = "127.0.0.1:0" test("set should always set value") { val params = Map.empty[String, String] @@ -73,24 +76,38 @@ class KafkaConfigUpdaterSuite extends SparkFunSuite with KafkaDelegationTokenTes } test("setAuthenticationConfigIfNeeded with token should set values") { - val params = Map.empty[String, String] - setSparkEnv(Map.empty) - addTokenToUGI() + val params = Map( + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> bootStrapServers + ) + setSparkEnv( + Map( + s"spark.kafka.clusters.$identifier.auth.bootstrap.servers" -> bootStrapServers + ) + ) + addTokenToUGI(tokenService) val updatedParams = KafkaConfigUpdater(testModule, params) .setAuthenticationConfigIfNeeded() .build() - assert(updatedParams.size() === 2) + assert(updatedParams.size() === 3) + assert(updatedParams.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) === bootStrapServers) assert(updatedParams.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) assert(updatedParams.get(SaslConfigs.SASL_MECHANISM) === - Kafka.TOKEN_SASL_MECHANISM.defaultValueString) + KafkaTokenSparkConf.DEFAULT_SASL_TOKEN_MECHANISM) } - test("setAuthenticationConfigIfNeeded with token and invalid mechanism should throw exception") { - val params = Map.empty[String, String] - setSparkEnv(Map[String, String](Kafka.TOKEN_SASL_MECHANISM.key -> "INVALID")) - addTokenToUGI() + test("setAuthenticationConfigIfNeeded with invalid mechanism should throw exception") { + val params = Map( + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> bootStrapServers + ) + setSparkEnv( + Map( + s"spark.kafka.clusters.$identifier.auth.bootstrap.servers" -> bootStrapServers, + s"spark.kafka.clusters.$identifier.sasl.token.mechanism" -> "intentionally_invalid" + ) + ) + addTokenToUGI(tokenService) val e = intercept[IllegalArgumentException] { KafkaConfigUpdater(testModule, params) @@ -102,12 +119,16 @@ class KafkaConfigUpdaterSuite extends SparkFunSuite with KafkaDelegationTokenTes } test("setAuthenticationConfigIfNeeded without security should not set values") { - val params = Map.empty[String, String] + val params = Map( + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> bootStrapServers + ) + setSparkEnv(Map.empty) val updatedParams = KafkaConfigUpdater(testModule, params) .setAuthenticationConfigIfNeeded() .build() - assert(updatedParams.size() === 0) + assert(updatedParams.size() === 1) + assert(updatedParams.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) === bootStrapServers) } } diff --git a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala index bd9b873..ac59c61 100644 --- a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala +++ b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala @@ -20,6 +20,7 @@ package org.apache.spark.kafka010 import java.{util => ju} import javax.security.auth.login.{AppConfigurationEntry, Configuration} +import org.apache.hadoop.io.Text import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.hadoop.security.token.Token import org.mockito.Mockito.mock @@ -70,15 +71,15 @@ trait KafkaDelegationTokenTest extends BeforeAndAfterEach { Configuration.setConfiguration(new KafkaJaasConfiguration) } - protected def addTokenToUGI(): Unit = { + protected def addTokenToUGI(tokenService: Text): Unit = { val token = new Token[KafkaDelegationTokenIdentifier]( tokenId.getBytes, tokenPassword.getBytes, KafkaTokenUtil.TOKEN_KIND, - KafkaTokenUtil.TOKEN_SERVICE + tokenService ) val creds = new Credentials() - creds.addToken(KafkaTokenUtil.TOKEN_SERVICE, token) + creds.addToken(token.getService, token) UserGroupInformation.getCurrentUser.addCredentials(creds) } diff --git a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenSparkConfSuite.scala b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenSparkConfSuite.scala new file mode 100644 index 0000000..60bb8a2 --- /dev/null +++ b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenSparkConfSuite.scala @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kafka010 + +import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_SSL, SSL} +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.{SparkConf, SparkFunSuite} + +class KafkaTokenSparkConfSuite extends SparkFunSuite with BeforeAndAfterEach { + private val identifier1 = "cluster1" + private val identifier2 = "cluster2" + private val authBootStrapServers = "127.0.0.1:0" + private val targetServersRegex = "127.0.0.*:0" + private val securityProtocol = SSL.name + private val kerberosServiceName = "kafka1" + private val trustStoreLocation = "/path/to/trustStore" + private val trustStorePassword = "trustStoreSecret" + private val keyStoreLocation = "/path/to/keyStore" + private val keyStorePassword = "keyStoreSecret" + private val keyPassword = "keySecret" + private val tokenMechanism = "SCRAM-SHA-256" + + private var sparkConf: SparkConf = null + + override def beforeEach(): Unit = { + super.beforeEach() + sparkConf = new SparkConf() + } + + test("getClusterConfig should trow exception when not exists") { + val thrown = intercept[NoSuchElementException] { + KafkaTokenSparkConf.getClusterConfig(sparkConf, "invalid") + } + assert(thrown.getMessage contains "spark.kafka.clusters.invalid.auth.bootstrap.servers") + } + + test("getClusterConfig should return entry with defaults") { + sparkConf.set(s"spark.kafka.clusters.$identifier1.auth.bootstrap.servers", authBootStrapServers) + + val clusterConfig = KafkaTokenSparkConf.getClusterConfig(sparkConf, identifier1) + assert(clusterConfig.identifier === identifier1) + assert(clusterConfig.authBootstrapServers === authBootStrapServers) + assert(clusterConfig.targetServersRegex === KafkaTokenSparkConf.DEFAULT_TARGET_SERVERS_REGEX) + assert(clusterConfig.securityProtocol === SASL_SSL.name) + assert(clusterConfig.kerberosServiceName === + KafkaTokenSparkConf.DEFAULT_SASL_KERBEROS_SERVICE_NAME) + assert(clusterConfig.trustStoreLocation === None) + assert(clusterConfig.trustStorePassword === None) + assert(clusterConfig.keyStoreLocation === None) + assert(clusterConfig.keyStorePassword === None) + assert(clusterConfig.keyPassword === None) + assert(clusterConfig.tokenMechanism === KafkaTokenSparkConf.DEFAULT_SASL_TOKEN_MECHANISM) + } + + test("getClusterConfig should return entry overwrite defaults") { + sparkConf.set(s"spark.kafka.clusters.$identifier1.auth.bootstrap.servers", authBootStrapServers) + sparkConf.set(s"spark.kafka.clusters.$identifier1.target.bootstrap.servers.regex", + targetServersRegex) + sparkConf.set(s"spark.kafka.clusters.$identifier1.security.protocol", securityProtocol) + sparkConf.set(s"spark.kafka.clusters.$identifier1.sasl.kerberos.service.name", + kerberosServiceName) + sparkConf.set(s"spark.kafka.clusters.$identifier1.ssl.truststore.location", trustStoreLocation) + sparkConf.set(s"spark.kafka.clusters.$identifier1.ssl.truststore.password", trustStorePassword) + sparkConf.set(s"spark.kafka.clusters.$identifier1.ssl.keystore.location", keyStoreLocation) + sparkConf.set(s"spark.kafka.clusters.$identifier1.ssl.keystore.password", keyStorePassword) + sparkConf.set(s"spark.kafka.clusters.$identifier1.ssl.key.password", keyPassword) + sparkConf.set(s"spark.kafka.clusters.$identifier1.sasl.token.mechanism", tokenMechanism) + + val clusterConfig = KafkaTokenSparkConf.getClusterConfig(sparkConf, identifier1) + assert(clusterConfig.identifier === identifier1) + assert(clusterConfig.authBootstrapServers === authBootStrapServers) + assert(clusterConfig.targetServersRegex === targetServersRegex) + assert(clusterConfig.securityProtocol === securityProtocol) + assert(clusterConfig.kerberosServiceName === kerberosServiceName) + assert(clusterConfig.trustStoreLocation === Some(trustStoreLocation)) + assert(clusterConfig.trustStorePassword === Some(trustStorePassword)) + assert(clusterConfig.keyStoreLocation === Some(keyStoreLocation)) + assert(clusterConfig.keyStorePassword === Some(keyStorePassword)) + assert(clusterConfig.keyPassword === Some(keyPassword)) + assert(clusterConfig.tokenMechanism === tokenMechanism) + } + + test("getAllClusterConfigs should return empty list when nothing configured") { + assert(KafkaTokenSparkConf.getAllClusterConfigs(sparkConf).isEmpty) + } + + test("getAllClusterConfigs should return empty list with malformed configuration") { + sparkConf.set(s"spark.kafka.clusters.", authBootStrapServers) + assert(KafkaTokenSparkConf.getAllClusterConfigs(sparkConf).isEmpty) + } + + test("getAllClusterConfigs should return multiple entries") { + sparkConf.set(s"spark.kafka.clusters.$identifier1.auth.bootstrap.servers", authBootStrapServers) + sparkConf.set(s"spark.kafka.clusters.$identifier2.auth.bootstrap.servers", authBootStrapServers) + + val clusterConfigs = KafkaTokenSparkConf.getAllClusterConfigs(sparkConf) + assert(clusterConfigs.size === 2) + clusterConfigs.foreach { clusterConfig => + assert(clusterConfig.authBootstrapServers === authBootStrapServers) + assert(clusterConfig.targetServersRegex === KafkaTokenSparkConf.DEFAULT_TARGET_SERVERS_REGEX) + assert(clusterConfig.securityProtocol === SASL_SSL.name) + assert(clusterConfig.kerberosServiceName === + KafkaTokenSparkConf.DEFAULT_SASL_KERBEROS_SERVICE_NAME) + assert(clusterConfig.trustStoreLocation === None) + assert(clusterConfig.trustStorePassword === None) + assert(clusterConfig.keyStoreLocation === None) + assert(clusterConfig.keyStorePassword === None) + assert(clusterConfig.keyPassword === None) + assert(clusterConfig.tokenMechanism === KafkaTokenSparkConf.DEFAULT_SASL_TOKEN_MECHANISM) + } + } +} diff --git a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala index 763f8db..11f954b 100644 --- a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala +++ b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.kafka010 import java.security.PrivilegedExceptionAction +import org.apache.hadoop.io.Text import org.apache.hadoop.security.UserGroupInformation import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.common.config.{SaslConfigs, SslConfigs} @@ -28,7 +29,13 @@ import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.internal.config._ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest { + private val identifier1 = "cluster1" + private val identifier2 = "cluster2" + private val tokenService1 = KafkaTokenUtil.getTokenService(identifier1) + private val tokenService2 = KafkaTokenUtil.getTokenService(identifier2) private val bootStrapServers = "127.0.0.1:0" + private val matchingTargetServersRegex = "127.0.0.*:0" + private val nonMatchingTargetServersRegex = "127.0.1.*:0" private val trustStoreLocation = "/path/to/trustStore" private val trustStorePassword = "trustStoreSecret" private val keyStoreLocation = "/path/to/keyStore" @@ -59,25 +66,11 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest { ) } - test("createAdminClientProperties without bootstrap servers should throw exception") { - val thrown = intercept[IllegalArgumentException] { - KafkaTokenUtil.createAdminClientProperties(sparkConf) - } - assert(thrown.getMessage contains - "Tried to obtain kafka delegation token but bootstrap servers not configured.") - } - test("createAdminClientProperties with SASL_PLAINTEXT protocol should not include " + "keystore and truststore config") { - sparkConf.set(Kafka.BOOTSTRAP_SERVERS, bootStrapServers) - sparkConf.set(Kafka.SECURITY_PROTOCOL, SASL_PLAINTEXT.name) - sparkConf.set(Kafka.TRUSTSTORE_LOCATION, trustStoreLocation) - sparkConf.set(Kafka.TRUSTSTORE_PASSWORD, trustStoreLocation) - sparkConf.set(Kafka.KEYSTORE_LOCATION, keyStoreLocation) - sparkConf.set(Kafka.KEYSTORE_PASSWORD, keyStorePassword) - sparkConf.set(Kafka.KEY_PASSWORD, keyPassword) + val clusterConf = createClusterConf(SASL_PLAINTEXT.name) - val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf) + val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf, clusterConf) assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) === bootStrapServers) @@ -91,15 +84,9 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest { } test("createAdminClientProperties with SASL_SSL protocol should include truststore config") { - sparkConf.set(Kafka.BOOTSTRAP_SERVERS, bootStrapServers) - sparkConf.set(Kafka.SECURITY_PROTOCOL, SASL_SSL.name) - sparkConf.set(Kafka.TRUSTSTORE_LOCATION, trustStoreLocation) - sparkConf.set(Kafka.TRUSTSTORE_PASSWORD, trustStorePassword) - sparkConf.set(Kafka.KEYSTORE_LOCATION, keyStoreLocation) - sparkConf.set(Kafka.KEYSTORE_PASSWORD, keyStorePassword) - sparkConf.set(Kafka.KEY_PASSWORD, keyPassword) + val clusterConf = createClusterConf(SASL_SSL.name) - val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf) + val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf, clusterConf) assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) === bootStrapServers) @@ -116,15 +103,9 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest { test("createAdminClientProperties with SSL protocol should include keystore and truststore " + "config") { - sparkConf.set(Kafka.BOOTSTRAP_SERVERS, bootStrapServers) - sparkConf.set(Kafka.SECURITY_PROTOCOL, SSL.name) - sparkConf.set(Kafka.TRUSTSTORE_LOCATION, trustStoreLocation) - sparkConf.set(Kafka.TRUSTSTORE_PASSWORD, trustStorePassword) - sparkConf.set(Kafka.KEYSTORE_LOCATION, keyStoreLocation) - sparkConf.set(Kafka.KEYSTORE_PASSWORD, keyStorePassword) - sparkConf.set(Kafka.KEY_PASSWORD, keyPassword) + val clusterConf = createClusterConf(SSL.name) - val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf) + val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf, clusterConf) assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) === bootStrapServers) @@ -140,11 +121,10 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest { } test("createAdminClientProperties with global config should not set dynamic jaas config") { - sparkConf.set(Kafka.BOOTSTRAP_SERVERS, bootStrapServers) - sparkConf.set(Kafka.SECURITY_PROTOCOL, SASL_SSL.name) + val clusterConf = createClusterConf(SASL_SSL.name) setGlobalKafkaClientConfig() - val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf) + val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf, clusterConf) assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) === bootStrapServers) @@ -155,12 +135,11 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest { } test("createAdminClientProperties with keytab should set keytab dynamic jaas config") { - sparkConf.set(Kafka.BOOTSTRAP_SERVERS, bootStrapServers) - sparkConf.set(Kafka.SECURITY_PROTOCOL, SASL_SSL.name) sparkConf.set(KEYTAB, keytab) sparkConf.set(PRINCIPAL, principal) + val clusterConf = createClusterConf(SASL_SSL.name) - val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf) + val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf, clusterConf) assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) === bootStrapServers) @@ -176,10 +155,9 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest { } test("createAdminClientProperties without keytab should set ticket cache dynamic jaas config") { - sparkConf.set(Kafka.BOOTSTRAP_SERVERS, bootStrapServers) - sparkConf.set(Kafka.SECURITY_PROTOCOL, SASL_SSL.name) + val clusterConf = createClusterConf(SASL_SSL.name) - val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf) + val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf, clusterConf) assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) === bootStrapServers) @@ -202,24 +180,73 @@ class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest { assert(KafkaTokenUtil.isGlobalJaasConfigurationProvided) } - test("isTokenAvailable without token should return false") { - assert(!KafkaTokenUtil.isTokenAvailable()) + test("findMatchingToken without token should return None") { + assert(KafkaTokenUtil.findMatchingToken(sparkConf, bootStrapServers) === None) } - test("isTokenAvailable with token should return true") { - addTokenToUGI() + test("findMatchingToken with non-matching tokens should return None") { + sparkConf.set(s"spark.kafka.clusters.$identifier1.auth.bootstrap.servers", bootStrapServers) + sparkConf.set(s"spark.kafka.clusters.$identifier1.target.bootstrap.servers.regex", + nonMatchingTargetServersRegex) + sparkConf.set(s"spark.kafka.clusters.$identifier2.bootstrap.servers", bootStrapServers) + sparkConf.set(s"spark.kafka.clusters.$identifier2.target.bootstrap.servers.regex", + matchingTargetServersRegex) + addTokenToUGI(tokenService1) + addTokenToUGI(new Text("intentionally_garbage")) + + assert(KafkaTokenUtil.findMatchingToken(sparkConf, bootStrapServers) === None) + } + + test("findMatchingToken with one matching token should return cluster configuration") { + sparkConf.set(s"spark.kafka.clusters.$identifier1.auth.bootstrap.servers", bootStrapServers) + sparkConf.set(s"spark.kafka.clusters.$identifier1.target.bootstrap.servers.regex", + matchingTargetServersRegex) + addTokenToUGI(tokenService1) - assert(KafkaTokenUtil.isTokenAvailable()) + assert(KafkaTokenUtil.findMatchingToken(sparkConf, bootStrapServers) === + Some(KafkaTokenSparkConf.getClusterConfig(sparkConf, identifier1))) + } + + test("findMatchingToken with multiple matching tokens should throw exception") { + sparkConf.set(s"spark.kafka.clusters.$identifier1.auth.bootstrap.servers", bootStrapServers) + sparkConf.set(s"spark.kafka.clusters.$identifier1.target.bootstrap.servers.regex", + matchingTargetServersRegex) + sparkConf.set(s"spark.kafka.clusters.$identifier2.auth.bootstrap.servers", bootStrapServers) + sparkConf.set(s"spark.kafka.clusters.$identifier2.target.bootstrap.servers.regex", + matchingTargetServersRegex) + addTokenToUGI(tokenService1) + addTokenToUGI(tokenService2) + + val thrown = intercept[IllegalArgumentException] { + KafkaTokenUtil.findMatchingToken(sparkConf, bootStrapServers) + } + assert(thrown.getMessage.contains("More than one delegation token matches")) } test("getTokenJaasParams with token should return scram module") { - addTokenToUGI() + addTokenToUGI(tokenService1) + val clusterConf = createClusterConf(SASL_SSL.name) - val jaasParams = KafkaTokenUtil.getTokenJaasParams(new SparkConf()) + val jaasParams = KafkaTokenUtil.getTokenJaasParams(clusterConf) assert(jaasParams.contains("ScramLoginModule required")) assert(jaasParams.contains("tokenauth=true")) assert(jaasParams.contains(tokenId)) assert(jaasParams.contains(tokenPassword)) } + + private def createClusterConf(securityProtocol: String): KafkaTokenClusterConf = { + KafkaTokenClusterConf( + identifier1, + bootStrapServers, + KafkaTokenSparkConf.DEFAULT_TARGET_SERVERS_REGEX, + securityProtocol, + KafkaTokenSparkConf.DEFAULT_SASL_KERBEROS_SERVICE_NAME, + Some(trustStoreLocation), + Some(trustStorePassword), + Some(keyStoreLocation), + Some(keyStorePassword), + Some(keyPassword), + KafkaTokenSparkConf.DEFAULT_SASL_TOKEN_MECHANISM) + } } diff --git a/external/kafka-0-10/pom.xml b/external/kafka-0-10/pom.xml index f78bdac..397de87 100644 --- a/external/kafka-0-10/pom.xml +++ b/external/kafka-0-10/pom.xml @@ -96,6 +96,11 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-tags_${scala.binary.version}</artifactId> </dependency> diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala index d934c64..d8df549 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala @@ -25,11 +25,13 @@ import scala.util.Random import org.apache.kafka.clients.consumer.ConsumerConfig._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.mockito.Mockito.when import org.scalatest.BeforeAndAfterAll +import org.scalatest.mockito.MockitoSugar import org.apache.spark._ -class KafkaDataConsumerSuite extends SparkFunSuite with BeforeAndAfterAll { +class KafkaDataConsumerSuite extends SparkFunSuite with MockitoSugar with BeforeAndAfterAll { private var testUtils: KafkaTestUtils = _ private val topic = "topic" + Random.nextInt() private val topicPartition = new TopicPartition(topic, 0) @@ -37,6 +39,11 @@ class KafkaDataConsumerSuite extends SparkFunSuite with BeforeAndAfterAll { override def beforeAll(): Unit = { super.beforeAll() + val conf = new SparkConf() + val env = mock[SparkEnv] + SparkEnv.set(env) + when(env.conf).thenReturn(conf) + testUtils = new KafkaTestUtils testUtils.setup() KafkaDataConsumer.init(16, 64, 0.75f) @@ -47,6 +54,7 @@ class KafkaDataConsumerSuite extends SparkFunSuite with BeforeAndAfterAll { testUtils.teardown() testUtils = null } + SparkEnv.set(null) super.afterAll() } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org