[ 
https://issues.apache.org/jira/browse/SPARK-26322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16719604#comment-16719604
 ] 

ASF GitHub Bot commented on SPARK-26322:
----------------------------------------

asfgit closed pull request #23274: [SPARK-26322][SS] Add 
spark.kafka.sasl.token.mechanism to ease delegation token configuration.
URL: https://github.com/apache/spark/pull/23274
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala 
b/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala
index 85d74c27142ad..88c612c2d951a 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala
@@ -79,4 +79,13 @@ private[spark] object Kafka {
         "For further details please see kafka documentation. Only used to 
obtain delegation token.")
       .stringConf
       .createOptional
+
+  val TOKEN_SASL_MECHANISM =
+    ConfigBuilder("spark.kafka.sasl.token.mechanism")
+      .doc("SASL mechanism used for client connections with delegation token. 
Because SCRAM " +
+        "login module used for authentication a compatible mechanism has to be 
set here. " +
+        "For further details please see kafka documentation (sasl.mechanism). 
Only used to " +
+        "authenticate against Kafka broker with delegation token.")
+      .stringConf
+      .createWithDefault("SCRAM-SHA-512")
 }
diff --git a/docs/structured-streaming-kafka-integration.md 
b/docs/structured-streaming-kafka-integration.md
index 7040f8da2c614..3d64ec4cb55f7 100644
--- a/docs/structured-streaming-kafka-integration.md
+++ b/docs/structured-streaming-kafka-integration.md
@@ -642,9 +642,9 @@ This way the application can be configured via Spark 
parameters and may not need
 configuration (Spark can use Kafka's dynamic JAAS configuration feature). For 
further information
 about delegation tokens, see [Kafka delegation token 
docs](http://kafka.apache.org/documentation/#security_delegation_token).
 
-The process is initiated by Spark's Kafka delegation token provider. When 
`spark.kafka.bootstrap.servers`,
+The process is initiated by Spark's Kafka delegation token provider. When 
`spark.kafka.bootstrap.servers` is set,
 Spark considers the following log in options, in order of preference:
-- **JAAS login configuration**
+- **JAAS login configuration**, please see example below.
 - **Keytab file**, such as,
 
       ./bin/spark-submit \
@@ -669,144 +669,8 @@ Kafka broker configuration):
 
 After obtaining delegation token successfully, Spark distributes it across 
nodes and renews it accordingly.
 Delegation token uses `SCRAM` login module for authentication and because of 
that the appropriate
-`sasl.mechanism` has to be configured on source/sink (it must match with Kafka 
broker configuration):
-
-<div class="codetabs">
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-
-// Setting on Kafka Source for Streaming Queries
-val df = spark
-  .readStream
-  .format("kafka")
-  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
-  .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
-  .option("subscribe", "topic1")
-  .load()
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-  .as[(String, String)]
-
-// Setting on Kafka Source for Batch Queries
-val df = spark
-  .read
-  .format("kafka")
-  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
-  .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
-  .option("subscribe", "topic1")
-  .load()
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-  .as[(String, String)]
-
-// Setting on Kafka Sink for Streaming Queries
-val ds = df
-  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-  .writeStream
-  .format("kafka")
-  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
-  .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
-  .option("topic", "topic1")
-  .start()
-
-// Setting on Kafka Sink for Batch Queries
-val ds = df
-  .selectExpr("topic1", "CAST(key AS STRING)", "CAST(value AS STRING)")
-  .write
-  .format("kafka")
-  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
-  .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
-  .save()
-
-{% endhighlight %}
-</div>
-<div data-lang="java" markdown="1">
-{% highlight java %}
-
-// Setting on Kafka Source for Streaming Queries
-Dataset<Row> df = spark
-  .readStream()
-  .format("kafka")
-  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
-  .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
-  .option("subscribe", "topic1")
-  .load();
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
-
-// Setting on Kafka Source for Batch Queries
-Dataset<Row> df = spark
-  .read()
-  .format("kafka")
-  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
-  .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
-  .option("subscribe", "topic1")
-  .load();
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
-
-// Setting on Kafka Sink for Streaming Queries
-StreamingQuery ds = df
-  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-  .writeStream()
-  .format("kafka")
-  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
-  .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
-  .option("topic", "topic1")
-  .start();
-
-// Setting on Kafka Sink for Batch Queries
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-  .write()
-  .format("kafka")
-  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
-  .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
-  .option("topic", "topic1")
-  .save();
-
-{% endhighlight %}
-</div>
-<div data-lang="python" markdown="1">
-{% highlight python %}
-
-// Setting on Kafka Source for Streaming Queries
-df = spark \
-  .readStream \
-  .format("kafka") \
-  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
-  .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
-  .option("subscribe", "topic1") \
-  .load()
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-
-// Setting on Kafka Source for Batch Queries
-df = spark \
-  .read \
-  .format("kafka") \
-  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
-  .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
-  .option("subscribe", "topic1") \
-  .load()
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-
-// Setting on Kafka Sink for Streaming Queries
-ds = df \
-  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
-  .writeStream \
-  .format("kafka") \
-  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
-  .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
-  .option("topic", "topic1") \
-  .start()
-
-// Setting on Kafka Sink for Batch Queries
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
-  .write \
-  .format("kafka") \
-  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
-  .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
-  .option("topic", "topic1") \
-  .save()
-
-{% endhighlight %}
-</div>
-</div>
+`spark.kafka.sasl.token.mechanism` (default: `SCRAM-SHA-512`) has to be 
configured. Also, this parameter
+must match with Kafka broker configuration.
 
 When delegation token is available on an executor it can be overridden with 
JAAS login configuration.
 
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala
index 74d5ef9c05f14..a9b76def4f8be 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.kafka010
 
 import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.security.token.{Token, TokenIdentifier}
 import org.apache.kafka.common.security.scram.ScramLoginModule
 
 import org.apache.spark.SparkConf
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 0ac330435e5c5..6a0c2088ac3d1 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -30,6 +30,7 @@ import 
org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
 import org.apache.spark.SparkEnv
 import org.apache.spark.deploy.security.KafkaTokenUtil
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
 import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, 
SQLContext}
 import org.apache.spark.sql.execution.streaming.{Sink, Source}
 import org.apache.spark.sql.sources._
@@ -501,7 +502,7 @@ private[kafka010] object KafkaSourceProvider extends 
Logging {
       // If buffer config is not set, set it to reasonable value to work around
       // buffer issues (see KAFKA-3135)
       .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: 
java.lang.Integer)
-      .setTokenJaasConfigIfNeeded()
+      .setAuthenticationConfigIfNeeded()
       .build()
 
   def kafkaParamsForExecutors(
@@ -523,7 +524,7 @@ private[kafka010] object KafkaSourceProvider extends 
Logging {
       // If buffer config is not set, set it to reasonable value to work around
       // buffer issues (see KAFKA-3135)
       .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: 
java.lang.Integer)
-      .setTokenJaasConfigIfNeeded()
+      .setAuthenticationConfigIfNeeded()
       .build()
 
   /**
@@ -556,7 +557,7 @@ private[kafka010] object KafkaSourceProvider extends 
Logging {
       this
     }
 
-    def setTokenJaasConfigIfNeeded(): ConfigUpdater = {
+    def setAuthenticationConfigIfNeeded(): ConfigUpdater = {
       // There are multiple possibilities to log in and applied in the 
following order:
       // - JVM global security provided -> try to log in with JVM global 
security configuration
       //   which can be configured for example with 
'java.security.auth.login.config'.
@@ -568,11 +569,11 @@ private[kafka010] object KafkaSourceProvider extends 
Logging {
       } else if (KafkaSecurityHelper.isTokenAvailable()) {
         logDebug("Delegation token detected, using it for login.")
         val jaasParams = 
KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf)
-        val mechanism = kafkaParams
-          .getOrElse(SaslConfigs.SASL_MECHANISM, 
SaslConfigs.DEFAULT_SASL_MECHANISM)
+        set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
+        val mechanism = SparkEnv.get.conf.get(Kafka.TOKEN_SASL_MECHANISM)
         require(mechanism.startsWith("SCRAM"),
           "Delegation token works only with SCRAM mechanism.")
-        set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
+        set(SaslConfigs.SASL_MECHANISM, mechanism)
       }
       this
     }
@@ -600,7 +601,7 @@ private[kafka010] object KafkaSourceProvider extends 
Logging {
     ConfigUpdater("executor", specifiedKafkaParams)
       .set(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, serClassName)
       .set(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serClassName)
-      .setTokenJaasConfigIfNeeded()
+      .setAuthenticationConfigIfNeeded()
       .build()
   }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Simplify kafka delegation token sasl.mechanism configuration
> ------------------------------------------------------------
>
>                 Key: SPARK-26322
>                 URL: https://issues.apache.org/jira/browse/SPARK-26322
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 3.0.0
>            Reporter: Gabor Somogyi
>            Assignee: Gabor Somogyi
>            Priority: Major
>             Fix For: 3.0.0
>
>
> When Kafka delegation token obtained, SCRAM sasl.mechanism has to be 
> configured for authentication. This can be configured on the related 
> source/sink which is inconvenient from user perspective. Such granularity is 
> not required and this configuration can be implemented with one central 
> parameter.
> Kafka now supports 2 SCRAM related sasl.mechanism:
> - SCRAM-SHA-256
> - SCRAM-SHA-512
> and these are configured on brokers, which makes this configuration global.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to