[ https://issues.apache.org/jira/browse/SPARK-26322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719604#comment-16719604 ]
ASF GitHub Bot commented on SPARK-26322: ---------------------------------------- asfgit closed pull request #23274: [SPARK-26322][SS] Add spark.kafka.sasl.token.mechanism to ease delegation token configuration. URL: https://github.com/apache/spark/pull/23274 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala b/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala index 85d74c27142ad..88c612c2d951a 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala @@ -79,4 +79,13 @@ private[spark] object Kafka { "For further details please see kafka documentation. Only used to obtain delegation token.") .stringConf .createOptional + + val TOKEN_SASL_MECHANISM = + ConfigBuilder("spark.kafka.sasl.token.mechanism") + .doc("SASL mechanism used for client connections with delegation token. Because SCRAM " + + "login module used for authentication a compatible mechanism has to be set here. " + + "For further details please see kafka documentation (sasl.mechanism). Only used to " + + "authenticate against Kafka broker with delegation token.") + .stringConf + .createWithDefault("SCRAM-SHA-512") } diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index 7040f8da2c614..3d64ec4cb55f7 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -642,9 +642,9 @@ This way the application can be configured via Spark parameters and may not need configuration (Spark can use Kafka's dynamic JAAS configuration feature). For further information about delegation tokens, see [Kafka delegation token docs](http://kafka.apache.org/documentation/#security_delegation_token). -The process is initiated by Spark's Kafka delegation token provider. When `spark.kafka.bootstrap.servers`, +The process is initiated by Spark's Kafka delegation token provider. When `spark.kafka.bootstrap.servers` is set, Spark considers the following log in options, in order of preference: -- **JAAS login configuration** +- **JAAS login configuration**, please see example below. - **Keytab file**, such as, ./bin/spark-submit \ @@ -669,144 +669,8 @@ Kafka broker configuration): After obtaining delegation token successfully, Spark distributes it across nodes and renews it accordingly. Delegation token uses `SCRAM` login module for authentication and because of that the appropriate -`sasl.mechanism` has to be configured on source/sink (it must match with Kafka broker configuration): - -<div class="codetabs"> -<div data-lang="scala" markdown="1"> -{% highlight scala %} - -// Setting on Kafka Source for Streaming Queries -val df = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") - .option("kafka.sasl.mechanism", "SCRAM-SHA-512") - .option("subscribe", "topic1") - .load() -df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - .as[(String, String)] - -// Setting on Kafka Source for Batch Queries -val df = spark - .read - .format("kafka") - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") - .option("kafka.sasl.mechanism", "SCRAM-SHA-512") - .option("subscribe", "topic1") - .load() -df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - .as[(String, String)] - -// Setting on Kafka Sink for Streaming Queries -val ds = df - .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - .writeStream - .format("kafka") - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") - .option("kafka.sasl.mechanism", "SCRAM-SHA-512") - .option("topic", "topic1") - .start() - -// Setting on Kafka Sink for Batch Queries -val ds = df - .selectExpr("topic1", "CAST(key AS STRING)", "CAST(value AS STRING)") - .write - .format("kafka") - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") - .option("kafka.sasl.mechanism", "SCRAM-SHA-512") - .save() - -{% endhighlight %} -</div> -<div data-lang="java" markdown="1"> -{% highlight java %} - -// Setting on Kafka Source for Streaming Queries -Dataset<Row> df = spark - .readStream() - .format("kafka") - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") - .option("kafka.sasl.mechanism", "SCRAM-SHA-512") - .option("subscribe", "topic1") - .load(); -df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"); - -// Setting on Kafka Source for Batch Queries -Dataset<Row> df = spark - .read() - .format("kafka") - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") - .option("kafka.sasl.mechanism", "SCRAM-SHA-512") - .option("subscribe", "topic1") - .load(); -df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"); - -// Setting on Kafka Sink for Streaming Queries -StreamingQuery ds = df - .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - .writeStream() - .format("kafka") - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") - .option("kafka.sasl.mechanism", "SCRAM-SHA-512") - .option("topic", "topic1") - .start(); - -// Setting on Kafka Sink for Batch Queries -df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - .write() - .format("kafka") - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") - .option("kafka.sasl.mechanism", "SCRAM-SHA-512") - .option("topic", "topic1") - .save(); - -{% endhighlight %} -</div> -<div data-lang="python" markdown="1"> -{% highlight python %} - -// Setting on Kafka Source for Streaming Queries -df = spark \ - .readStream \ - .format("kafka") \ - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ - .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \ - .option("subscribe", "topic1") \ - .load() -df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - -// Setting on Kafka Source for Batch Queries -df = spark \ - .read \ - .format("kafka") \ - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ - .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \ - .option("subscribe", "topic1") \ - .load() -df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - -// Setting on Kafka Sink for Streaming Queries -ds = df \ - .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \ - .writeStream \ - .format("kafka") \ - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ - .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \ - .option("topic", "topic1") \ - .start() - -// Setting on Kafka Sink for Batch Queries -df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \ - .write \ - .format("kafka") \ - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ - .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \ - .option("topic", "topic1") \ - .save() - -{% endhighlight %} -</div> -</div> +`spark.kafka.sasl.token.mechanism` (default: `SCRAM-SHA-512`) has to be configured. Also, this parameter +must match with Kafka broker configuration. When delegation token is available on an executor it can be overridden with JAAS login configuration. diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala index 74d5ef9c05f14..a9b76def4f8be 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.kafka010 import org.apache.hadoop.security.UserGroupInformation -import org.apache.hadoop.security.token.{Token, TokenIdentifier} import org.apache.kafka.common.security.scram.ScramLoginModule import org.apache.spark.SparkConf diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 0ac330435e5c5..6a0c2088ac3d1 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -30,6 +30,7 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe import org.apache.spark.SparkEnv import org.apache.spark.deploy.security.KafkaTokenUtil import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext} import org.apache.spark.sql.execution.streaming.{Sink, Source} import org.apache.spark.sql.sources._ @@ -501,7 +502,7 @@ private[kafka010] object KafkaSourceProvider extends Logging { // If buffer config is not set, set it to reasonable value to work around // buffer issues (see KAFKA-3135) .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer) - .setTokenJaasConfigIfNeeded() + .setAuthenticationConfigIfNeeded() .build() def kafkaParamsForExecutors( @@ -523,7 +524,7 @@ private[kafka010] object KafkaSourceProvider extends Logging { // If buffer config is not set, set it to reasonable value to work around // buffer issues (see KAFKA-3135) .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer) - .setTokenJaasConfigIfNeeded() + .setAuthenticationConfigIfNeeded() .build() /** @@ -556,7 +557,7 @@ private[kafka010] object KafkaSourceProvider extends Logging { this } - def setTokenJaasConfigIfNeeded(): ConfigUpdater = { + def setAuthenticationConfigIfNeeded(): ConfigUpdater = { // There are multiple possibilities to log in and applied in the following order: // - JVM global security provided -> try to log in with JVM global security configuration // which can be configured for example with 'java.security.auth.login.config'. @@ -568,11 +569,11 @@ private[kafka010] object KafkaSourceProvider extends Logging { } else if (KafkaSecurityHelper.isTokenAvailable()) { logDebug("Delegation token detected, using it for login.") val jaasParams = KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf) - val mechanism = kafkaParams - .getOrElse(SaslConfigs.SASL_MECHANISM, SaslConfigs.DEFAULT_SASL_MECHANISM) + set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams) + val mechanism = SparkEnv.get.conf.get(Kafka.TOKEN_SASL_MECHANISM) require(mechanism.startsWith("SCRAM"), "Delegation token works only with SCRAM mechanism.") - set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams) + set(SaslConfigs.SASL_MECHANISM, mechanism) } this } @@ -600,7 +601,7 @@ private[kafka010] object KafkaSourceProvider extends Logging { ConfigUpdater("executor", specifiedKafkaParams) .set(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, serClassName) .set(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serClassName) - .setTokenJaasConfigIfNeeded() + .setAuthenticationConfigIfNeeded() .build() } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Simplify kafka delegation token sasl.mechanism configuration > ------------------------------------------------------------ > > Key: SPARK-26322 > URL: https://issues.apache.org/jira/browse/SPARK-26322 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming > Affects Versions: 3.0.0 > Reporter: Gabor Somogyi > Assignee: Gabor Somogyi > Priority: Major > Fix For: 3.0.0 > > > When Kafka delegation token obtained, SCRAM sasl.mechanism has to be > configured for authentication. This can be configured on the related > source/sink which is inconvenient from user perspective. Such granularity is > not required and this configuration can be implemented with one central > parameter. > Kafka now supports 2 SCRAM related sasl.mechanism: > - SCRAM-SHA-256 > - SCRAM-SHA-512 > and these are configured on brokers, which makes this configuration global. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org