Repository: spark
Updated Branches:
  refs/heads/master f97326bcd -> 0166c7373


[SPARK-25501][SS] Add kafka delegation token support.

## What changes were proposed in this pull request?

It adds kafka delegation token support for structured streaming. Please see the 
relevant 
[SPIP](https://docs.google.com/document/d/1ouRayzaJf_N5VQtGhVq9FURXVmRpXzEEWYHob0ne3NY/edit?usp=sharing)

What this PR contains:
* Configuration parameters for the feature
* Delegation token fetching from broker
* Usage of token through dynamic JAAS configuration
* Minor refactoring in the existing code

What this PR doesn't contain:
* Documentation changes because design can change

## How was this patch tested?

Existing tests + added small amount of additional unit tests.

Because it's an external service integration mainly tested on cluster.
* 4 node cluster
* Kafka broker version 1.1.0
* Topic with 4 partitions
* security.protocol = SASL_SSL
* sasl.mechanism = SCRAM-SHA-256

An example of obtaining a token:
```
18/10/01 01:07:49 INFO kafka010.TokenUtil: TOKENID         HMAC                 
          OWNER           RENEWERS                  ISSUEDATE       EXPIRYDATE  
    MAXDATE
18/10/01 01:07:49 INFO kafka010.TokenUtil: D1-v__Q5T_uHx55rW16Jwg [hidden] 
User:user    []                        2018-10-01T01:07 2018-10-02T01:07 
2018-10-08T01:07
18/10/01 01:07:49 INFO security.KafkaDelegationTokenProvider: Get token from 
Kafka: Kind: KAFKA_DELEGATION_TOKEN, Service: kafka.server.delegation.token, 
Ident: 44 31 2d 76 5f 5f 51 35 54 5f 75 48 78 35 35 72 57 31 36 4a 77 67
```

An example token usage:
```
18/10/01 01:08:07 INFO kafka010.KafkaSecurityHelper: Scram JAAS params: 
org.apache.kafka.common.security.scram.ScramLoginModule required tokenauth=true 
serviceName="kafka" username="D1-v__Q5T_uHx55rW16Jwg" password="[hidden]";
18/10/01 01:08:07 INFO kafka010.KafkaSourceProvider: Delegation token detected, 
using it for login.
```

Closes #22598 from gaborgsomogyi/SPARK-25501.

Authored-by: Gabor Somogyi <gabor.g.somo...@gmail.com>
Signed-off-by: Marcelo Vanzin <van...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0166c737
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0166c737
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0166c737

Branch: refs/heads/master
Commit: 0166c7373eee2654c49c210927e4e290d103f24f
Parents: f97326b
Author: Gabor Somogyi <gabor.g.somo...@gmail.com>
Authored: Thu Nov 29 18:00:47 2018 -0800
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Thu Nov 29 18:00:47 2018 -0800

----------------------------------------------------------------------
 core/pom.xml                                    |  13 +
 .../security/HadoopDelegationTokenManager.scala |   3 +-
 .../security/KafkaDelegationTokenProvider.scala |  61 +++++
 .../spark/deploy/security/KafkaTokenUtil.scala  | 202 ++++++++++++++++
 .../apache/spark/internal/config/Kafka.scala    |  82 +++++++
 .../HadoopDelegationTokenManagerSuite.scala     |   5 +-
 .../deploy/security/KafkaTokenUtilSuite.scala   | 239 +++++++++++++++++++
 external/kafka-0-10-sql/pom.xml                 |   2 -
 .../sql/kafka010/KafkaSecurityHelper.scala      |  56 +++++
 .../sql/kafka010/KafkaSourceProvider.scala      |  82 ++++---
 .../kafka010/KafkaStreamingWriteSupport.scala   |  22 +-
 .../sql/kafka010/KafkaContinuousSinkSuite.scala |   4 +-
 .../sql/kafka010/KafkaSecurityHelperSuite.scala | 100 ++++++++
 external/kafka-0-10/pom.xml                     |   2 -
 pom.xml                                         |   2 +
 15 files changed, 825 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0166c737/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 36d9321..49b1a54 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -408,6 +408,19 @@
       <scope>provided</scope>
     </dependency>
 
+    <!--
+     The following kafka dependency used to obtain delegation token.
+     In order to prevent spark-core from depending on kafka, these deps have 
been placed in the
+     "provided" scope, rather than the "compile" scope, and 
NoClassDefFoundError exceptions are
+     handled when the user explicitly use neither spark-streaming-kafka nor 
spark-sql-kafka modules.
+    -->
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>${kafka.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
   </dependencies>
   <build>
     
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>

http://git-wip-us.apache.org/repos/asf/spark/blob/0166c737/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
 
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
index 1169b28..126a6ab 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
@@ -274,7 +274,8 @@ private[spark] class HadoopDelegationTokenManager(
       new HadoopFSDelegationTokenProvider(
         () => HadoopDelegationTokenManager.this.fileSystemsToAccess())) ++
       safeCreateProvider(new HiveDelegationTokenProvider) ++
-      safeCreateProvider(new HBaseDelegationTokenProvider)
+      safeCreateProvider(new HBaseDelegationTokenProvider) ++
+      safeCreateProvider(new KafkaDelegationTokenProvider)
 
     // Filter out providers for which 
spark.security.credentials.{service}.enabled is false.
     providers

http://git-wip-us.apache.org/repos/asf/spark/blob/0166c737/core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
 
b/core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
new file mode 100644
index 0000000..45995be
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.language.existentials
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, 
SASL_SSL, SSL}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[security] class KafkaDelegationTokenProvider
+  extends HadoopDelegationTokenProvider with Logging {
+
+  override def serviceName: String = "kafka"
+
+  override def obtainDelegationTokens(
+      hadoopConf: Configuration,
+      sparkConf: SparkConf,
+      creds: Credentials): Option[Long] = {
+    try {
+      logDebug("Attempting to fetch Kafka security token.")
+      val (token, nextRenewalDate) = KafkaTokenUtil.obtainToken(sparkConf)
+      creds.addToken(token.getService, token)
+      return Some(nextRenewalDate)
+    } catch {
+      case NonFatal(e) =>
+        logInfo(s"Failed to get token from service $serviceName", e)
+    }
+    None
+  }
+
+  override def delegationTokensRequired(
+      sparkConf: SparkConf,
+      hadoopConf: Configuration): Boolean = {
+    val protocol = sparkConf.get(Kafka.SECURITY_PROTOCOL)
+    sparkConf.contains(Kafka.BOOTSTRAP_SERVERS) &&
+      (protocol == SASL_SSL.name ||
+        protocol == SSL.name ||
+        protocol == SASL_PLAINTEXT.name)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0166c737/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala 
b/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala
new file mode 100644
index 0000000..c890cee
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.{ util => ju }
+import java.text.SimpleDateFormat
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.JaasContext
+import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, 
SASL_SSL, SSL}
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+  val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  val TOKEN_SERVICE = new Text("kafka.server.delegation.token")
+
+  private[spark] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+    override def getKind: Text = TOKEN_KIND
+  }
+
+  private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: 
TokenIdentifier], Long) = {
+    val adminClient = 
AdminClient.create(createAdminClientProperties(sparkConf))
+    val createDelegationTokenOptions = new CreateDelegationTokenOptions()
+    val createResult = 
adminClient.createDelegationToken(createDelegationTokenOptions)
+    val token = createResult.delegationToken().get()
+    printToken(token)
+
+    (new Token[KafkaDelegationTokenIdentifier](
+      token.tokenInfo.tokenId.getBytes,
+      token.hmacAsBase64String.getBytes,
+      TOKEN_KIND,
+      TOKEN_SERVICE
+    ), token.tokenInfo.expiryTimestamp)
+  }
+
+  private[security] def createAdminClientProperties(sparkConf: SparkConf): 
ju.Properties = {
+    val adminClientProperties = new ju.Properties
+
+    val bootstrapServers = sparkConf.get(Kafka.BOOTSTRAP_SERVERS)
+    require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+      "servers not configured.")
+    adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+    val protocol = sparkConf.get(Kafka.SECURITY_PROTOCOL)
+    adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+    protocol match {
+      case SASL_SSL.name =>
+        setTrustStoreProperties(sparkConf, adminClientProperties)
+
+      case SSL.name =>
+        setTrustStoreProperties(sparkConf, adminClientProperties)
+        setKeyStoreProperties(sparkConf, adminClientProperties)
+        logWarning("Obtaining kafka delegation token with SSL protocol. Please 
" +
+          "configure 2-way authentication on the broker side.")
+
+      case SASL_PLAINTEXT.name =>
+        logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+          "consider the security impact.")
+    }
+
+    // There are multiple possibilities to log in and applied in the following 
order:
+    // - JVM global security provided -> try to log in with JVM global 
security configuration
+    //   which can be configured for example with 
'java.security.auth.login.config'.
+    //   For this no additional parameter needed.
+    // - Keytab is provided -> try to log in with kerberos module and keytab 
using kafka's dynamic
+    //   JAAS configuration.
+    // - Keytab not provided -> try to log in with kerberos module and ticket 
cache using kafka's
+    //   dynamic JAAS configuration.
+    // Kafka client is unable to use subject from JVM which already logged in
+    // to kdc (see KAFKA-7677)
+    if (isGlobalJaasConfigurationProvided) {
+      logDebug("JVM global security configuration detected, using it for 
login.")
+    } else {
+      adminClientProperties.put(SaslConfigs.SASL_MECHANISM, 
SaslConfigs.GSSAPI_MECHANISM)
+      if (sparkConf.contains(KEYTAB)) {
+        logDebug("Keytab detected, using it for login.")
+        val jaasParams = getKeytabJaasParams(sparkConf)
+        adminClientProperties.put(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
+      } else {
+        logDebug("Using ticket cache for login.")
+        val jaasParams = getTicketCacheJaasParams(sparkConf)
+        adminClientProperties.put(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
+      }
+    }
+
+    adminClientProperties
+  }
+
+  def isGlobalJaasConfigurationProvided: Boolean = {
+    try {
+      JaasContext.loadClientContext(ju.Collections.emptyMap[String, Object]())
+      true
+    } catch {
+      case NonFatal(_) => false
+    }
+  }
+
+  private def setTrustStoreProperties(sparkConf: SparkConf, properties: 
ju.Properties): Unit = {
+    sparkConf.get(Kafka.TRUSTSTORE_LOCATION).foreach { truststoreLocation =>
+      properties.put("ssl.truststore.location", truststoreLocation)
+    }
+    sparkConf.get(Kafka.TRUSTSTORE_PASSWORD).foreach { truststorePassword =>
+      properties.put("ssl.truststore.password", truststorePassword)
+    }
+  }
+
+  private def setKeyStoreProperties(sparkConf: SparkConf, properties: 
ju.Properties): Unit = {
+    sparkConf.get(Kafka.KEYSTORE_LOCATION).foreach { keystoreLocation =>
+      properties.put("ssl.keystore.location", keystoreLocation)
+    }
+    sparkConf.get(Kafka.KEYSTORE_PASSWORD).foreach { keystorePassword =>
+      properties.put("ssl.keystore.password", keystorePassword)
+    }
+    sparkConf.get(Kafka.KEY_PASSWORD).foreach { keyPassword =>
+      properties.put("ssl.key.password", keyPassword)
+    }
+  }
+
+  private[security] def getKeytabJaasParams(sparkConf: SparkConf): String = {
+    val serviceName = sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)
+    require(serviceName.nonEmpty, "Kerberos service name must be defined")
+
+    val params =
+      s"""
+      |${getKrb5LoginModuleName} required
+      | useKeyTab=true
+      | serviceName="${serviceName.get}"
+      | keyTab="${sparkConf.get(KEYTAB).get}"
+      | principal="${sparkConf.get(PRINCIPAL).get}";
+      """.stripMargin.replace("\n", "")
+    logDebug(s"Krb keytab JAAS params: $params")
+    params
+  }
+
+  def getTicketCacheJaasParams(sparkConf: SparkConf): String = {
+    val serviceName = sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)
+    require(serviceName.nonEmpty, "Kerberos service name must be defined")
+
+    val params =
+      s"""
+      |${getKrb5LoginModuleName} required
+      | useTicketCache=true
+      | serviceName="${serviceName.get}";
+      """.stripMargin.replace("\n", "")
+    logDebug(s"Krb ticket cache JAAS params: $params")
+    params
+  }
+
+  /**
+   * Krb5LoginModule package vary in different JVMs.
+   * Please see Hadoop UserGroupInformation for further details.
+   */
+  private def getKrb5LoginModuleName(): String = {
+    if (System.getProperty("java.vendor").contains("IBM")) {
+      "com.ibm.security.auth.module.Krb5LoginModule"
+    } else {
+      "com.sun.security.auth.module.Krb5LoginModule"
+    }
+  }
+
+  private def printToken(token: DelegationToken): Unit = {
+    if (log.isDebugEnabled) {
+      val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm")
+      logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format(
+        "TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", 
"MAXDATE"))
+      val tokenInfo = token.tokenInfo
+      logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format(
+        tokenInfo.tokenId,
+        tokenInfo.owner,
+        tokenInfo.renewersAsString,
+        dateFormat.format(tokenInfo.issueTimestamp),
+        dateFormat.format(tokenInfo.expiryTimestamp),
+        dateFormat.format(tokenInfo.maxTimestamp)))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0166c737/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala 
b/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala
new file mode 100644
index 0000000..85d74c2
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.config
+
+private[spark] object Kafka {
+
+  val BOOTSTRAP_SERVERS =
+    ConfigBuilder("spark.kafka.bootstrap.servers")
+      .doc("A list of coma separated host/port pairs to use for establishing 
the initial " +
+        "connection to the Kafka cluster. For further details please see kafka 
documentation. " +
+        "Only used to obtain delegation token.")
+      .stringConf
+      .createOptional
+
+  val SECURITY_PROTOCOL =
+    ConfigBuilder("spark.kafka.security.protocol")
+      .doc("Protocol used to communicate with brokers. For further details 
please see kafka " +
+        "documentation. Only used to obtain delegation token.")
+      .stringConf
+      .createWithDefault("SASL_SSL")
+
+  val KERBEROS_SERVICE_NAME =
+    ConfigBuilder("spark.kafka.sasl.kerberos.service.name")
+      .doc("The Kerberos principal name that Kafka runs as. This can be 
defined either in " +
+        "Kafka's JAAS config or in Kafka's config. For further details please 
see kafka " +
+        "documentation. Only used to obtain delegation token.")
+      .stringConf
+      .createOptional
+
+  val TRUSTSTORE_LOCATION =
+    ConfigBuilder("spark.kafka.ssl.truststore.location")
+      .doc("The location of the trust store file. For further details please 
see kafka " +
+        "documentation. Only used to obtain delegation token.")
+      .stringConf
+      .createOptional
+
+  val TRUSTSTORE_PASSWORD =
+    ConfigBuilder("spark.kafka.ssl.truststore.password")
+      .doc("The store password for the trust store file. This is optional for 
client and only " +
+        "needed if ssl.truststore.location is configured. For further details 
please see kafka " +
+        "documentation. Only used to obtain delegation token.")
+      .stringConf
+      .createOptional
+
+  val KEYSTORE_LOCATION =
+    ConfigBuilder("spark.kafka.ssl.keystore.location")
+      .doc("The location of the key store file. This is optional for client 
and can be used for " +
+        "two-way authentication for client. For further details please see 
kafka documentation. " +
+        "Only used to obtain delegation token.")
+      .stringConf
+      .createOptional
+
+  val KEYSTORE_PASSWORD =
+    ConfigBuilder("spark.kafka.ssl.keystore.password")
+      .doc("The store password for the key store file. This is optional for 
client and only " +
+        "needed if ssl.keystore.location is configured. For further details 
please see kafka " +
+        "documentation. Only used to obtain delegation token.")
+      .stringConf
+      .createOptional
+
+  val KEY_PASSWORD =
+    ConfigBuilder("spark.kafka.ssl.key.password")
+      .doc("The password of the private key in the key store file. This is 
optional for client. " +
+        "For further details please see kafka documentation. Only used to 
obtain delegation token.")
+      .stringConf
+      .createOptional
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0166c737/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
index e0e630e..def9e62 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
@@ -19,8 +19,6 @@ package org.apache.spark.deploy.security
 
 import org.apache.commons.io.IOUtils
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FileSystem
-import org.apache.hadoop.security.Credentials
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.util.Utils
@@ -33,6 +31,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite 
{
     assert(manager.isProviderLoaded("hadoopfs"))
     assert(manager.isProviderLoaded("hbase"))
     assert(manager.isProviderLoaded("hive"))
+    assert(manager.isProviderLoaded("kafka"))
   }
 
   test("disable hive credential provider") {
@@ -41,6 +40,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite 
{
     assert(manager.isProviderLoaded("hadoopfs"))
     assert(manager.isProviderLoaded("hbase"))
     assert(!manager.isProviderLoaded("hive"))
+    assert(manager.isProviderLoaded("kafka"))
   }
 
   test("using deprecated configurations") {
@@ -51,6 +51,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite 
{
     assert(!manager.isProviderLoaded("hadoopfs"))
     assert(manager.isProviderLoaded("hbase"))
     assert(!manager.isProviderLoaded("hive"))
+    assert(manager.isProviderLoaded("kafka"))
   }
 
   test("SPARK-23209: obtain tokens when Hive classes are not available") {

http://git-wip-us.apache.org/repos/asf/spark/blob/0166c737/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala
new file mode 100644
index 0000000..682bebd
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.{ util => ju }
+import javax.security.auth.login.{AppConfigurationEntry, Configuration}
+
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, 
SASL_SSL, SSL}
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config._
+
+class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
+  private val bootStrapServers = "127.0.0.1:0"
+  private val trustStoreLocation = "/path/to/trustStore"
+  private val trustStorePassword = "trustStoreSecret"
+  private val keyStoreLocation = "/path/to/keyStore"
+  private val keyStorePassword = "keyStoreSecret"
+  private val keyPassword = "keySecret"
+  private val keytab = "/path/to/keytab"
+  private val kerberosServiceName = "kafka"
+  private val principal = "u...@domain.com"
+
+  private var sparkConf: SparkConf = null
+
+  private class KafkaJaasConfiguration extends Configuration {
+    val entry =
+      new AppConfigurationEntry(
+        "DummyModule",
+        AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
+        ju.Collections.emptyMap[String, Object]()
+      )
+
+    override def getAppConfigurationEntry(name: String): 
Array[AppConfigurationEntry] = {
+      if (name.equals("KafkaClient")) {
+        Array(entry)
+      } else {
+        null
+      }
+    }
+  }
+
+  override def beforeEach(): Unit = {
+    super.beforeEach()
+    sparkConf = new SparkConf()
+  }
+
+  override def afterEach(): Unit = {
+    try {
+      resetGlobalConfig()
+    } finally {
+      super.afterEach()
+    }
+  }
+
+  private def setGlobalKafkaClientConfig(): Unit = {
+    Configuration.setConfiguration(new KafkaJaasConfiguration)
+  }
+
+  private def resetGlobalConfig(): Unit = {
+    Configuration.setConfiguration(null)
+  }
+
+  test("createAdminClientProperties without bootstrap servers should throw 
exception") {
+    val thrown = intercept[IllegalArgumentException] {
+      KafkaTokenUtil.createAdminClientProperties(sparkConf)
+    }
+    assert(thrown.getMessage contains
+      "Tried to obtain kafka delegation token but bootstrap servers not 
configured.")
+  }
+
+  test("createAdminClientProperties with SASL_PLAINTEXT protocol should not 
include " +
+      "keystore and truststore config") {
+    sparkConf.set(Kafka.BOOTSTRAP_SERVERS, bootStrapServers)
+    sparkConf.set(Kafka.SECURITY_PROTOCOL, SASL_PLAINTEXT.name)
+    sparkConf.set(Kafka.TRUSTSTORE_LOCATION, trustStoreLocation)
+    sparkConf.set(Kafka.TRUSTSTORE_PASSWORD, trustStoreLocation)
+    sparkConf.set(Kafka.KEYSTORE_LOCATION, keyStoreLocation)
+    sparkConf.set(Kafka.KEYSTORE_PASSWORD, keyStorePassword)
+    sparkConf.set(Kafka.KEY_PASSWORD, keyPassword)
+    sparkConf.set(Kafka.KERBEROS_SERVICE_NAME, kerberosServiceName)
+
+    val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf)
+
+    
assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+      === bootStrapServers)
+    
assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
+      === SASL_PLAINTEXT.name)
+    assert(!adminClientProperties.containsKey("ssl.truststore.location"))
+    assert(!adminClientProperties.containsKey("ssl.truststore.password"))
+    assert(!adminClientProperties.containsKey("ssl.keystore.location"))
+    assert(!adminClientProperties.containsKey("ssl.keystore.password"))
+    assert(!adminClientProperties.containsKey("ssl.key.password"))
+  }
+
+  test("createAdminClientProperties with SASL_SSL protocol should include 
truststore config") {
+    sparkConf.set(Kafka.BOOTSTRAP_SERVERS, bootStrapServers)
+    sparkConf.set(Kafka.SECURITY_PROTOCOL, SASL_SSL.name)
+    sparkConf.set(Kafka.TRUSTSTORE_LOCATION, trustStoreLocation)
+    sparkConf.set(Kafka.TRUSTSTORE_PASSWORD, trustStorePassword)
+    sparkConf.set(Kafka.KEYSTORE_LOCATION, keyStoreLocation)
+    sparkConf.set(Kafka.KEYSTORE_PASSWORD, keyStorePassword)
+    sparkConf.set(Kafka.KEY_PASSWORD, keyPassword)
+    sparkConf.set(Kafka.KERBEROS_SERVICE_NAME, kerberosServiceName)
+
+    val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf)
+
+    
assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+      === bootStrapServers)
+    
assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
+      === SASL_SSL.name)
+    assert(adminClientProperties.get("ssl.truststore.location") === 
trustStoreLocation)
+    assert(adminClientProperties.get("ssl.truststore.password") === 
trustStorePassword)
+    assert(!adminClientProperties.containsKey("ssl.keystore.location"))
+    assert(!adminClientProperties.containsKey("ssl.keystore.password"))
+    assert(!adminClientProperties.containsKey("ssl.key.password"))
+  }
+
+  test("createAdminClientProperties with SSL protocol should include keystore 
and truststore " +
+      "config") {
+    sparkConf.set(Kafka.BOOTSTRAP_SERVERS, bootStrapServers)
+    sparkConf.set(Kafka.SECURITY_PROTOCOL, SSL.name)
+    sparkConf.set(Kafka.TRUSTSTORE_LOCATION, trustStoreLocation)
+    sparkConf.set(Kafka.TRUSTSTORE_PASSWORD, trustStorePassword)
+    sparkConf.set(Kafka.KEYSTORE_LOCATION, keyStoreLocation)
+    sparkConf.set(Kafka.KEYSTORE_PASSWORD, keyStorePassword)
+    sparkConf.set(Kafka.KEY_PASSWORD, keyPassword)
+    sparkConf.set(Kafka.KERBEROS_SERVICE_NAME, kerberosServiceName)
+
+    val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf)
+
+    
assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+      === bootStrapServers)
+    
assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
+      === SSL.name)
+    assert(adminClientProperties.get("ssl.truststore.location") === 
trustStoreLocation)
+    assert(adminClientProperties.get("ssl.truststore.password") === 
trustStorePassword)
+    assert(adminClientProperties.get("ssl.keystore.location") === 
keyStoreLocation)
+    assert(adminClientProperties.get("ssl.keystore.password") === 
keyStorePassword)
+    assert(adminClientProperties.get("ssl.key.password") === keyPassword)
+  }
+
+  test("createAdminClientProperties with global config should not set dynamic 
jaas config") {
+    sparkConf.set(Kafka.BOOTSTRAP_SERVERS, bootStrapServers)
+    sparkConf.set(Kafka.SECURITY_PROTOCOL, SASL_SSL.name)
+    setGlobalKafkaClientConfig()
+
+    val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf)
+
+    
assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+      === bootStrapServers)
+    
assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
+      === SASL_SSL.name)
+    assert(!adminClientProperties.containsKey(SaslConfigs.SASL_MECHANISM))
+    assert(!adminClientProperties.containsKey(SaslConfigs.SASL_JAAS_CONFIG))
+  }
+
+  test("createAdminClientProperties with keytab should set keytab dynamic jaas 
config") {
+    sparkConf.set(Kafka.BOOTSTRAP_SERVERS, bootStrapServers)
+    sparkConf.set(Kafka.SECURITY_PROTOCOL, SASL_SSL.name)
+    sparkConf.set(KEYTAB, keytab)
+    sparkConf.set(Kafka.KERBEROS_SERVICE_NAME, kerberosServiceName)
+    sparkConf.set(PRINCIPAL, principal)
+
+    val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf)
+
+    
assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+      === bootStrapServers)
+    
assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
+      === SASL_SSL.name)
+    assert(adminClientProperties.containsKey(SaslConfigs.SASL_MECHANISM))
+    val saslJaasConfig = 
adminClientProperties.getProperty(SaslConfigs.SASL_JAAS_CONFIG)
+    assert(saslJaasConfig.contains("Krb5LoginModule required"))
+    assert(saslJaasConfig.contains("useKeyTab=true"))
+  }
+
+  test("createAdminClientProperties without keytab should set ticket cache 
dynamic jaas config") {
+    sparkConf.set(Kafka.BOOTSTRAP_SERVERS, bootStrapServers)
+    sparkConf.set(Kafka.SECURITY_PROTOCOL, SASL_SSL.name)
+    sparkConf.set(Kafka.KERBEROS_SERVICE_NAME, kerberosServiceName)
+
+    val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf)
+
+    
assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+      === bootStrapServers)
+    
assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
+      === SASL_SSL.name)
+    assert(adminClientProperties.containsKey(SaslConfigs.SASL_MECHANISM))
+    val saslJaasConfig = 
adminClientProperties.getProperty(SaslConfigs.SASL_JAAS_CONFIG)
+    assert(saslJaasConfig.contains("Krb5LoginModule required"))
+    assert(saslJaasConfig.contains("useTicketCache=true"))
+  }
+
+  test("isGlobalJaasConfigurationProvided without global config should return 
false") {
+    assert(!KafkaTokenUtil.isGlobalJaasConfigurationProvided)
+  }
+
+  test("isGlobalJaasConfigurationProvided with global config should return 
false") {
+    setGlobalKafkaClientConfig()
+
+    assert(KafkaTokenUtil.isGlobalJaasConfigurationProvided)
+  }
+
+  test("getKeytabJaasParams with keytab no service should throw exception") {
+    sparkConf.set(KEYTAB, keytab)
+
+    val thrown = intercept[IllegalArgumentException] {
+      KafkaTokenUtil.getKeytabJaasParams(sparkConf)
+    }
+
+    assert(thrown.getMessage contains "Kerberos service name must be defined")
+  }
+
+  test("getTicketCacheJaasParams without service should throw exception") {
+    val thrown = intercept[IllegalArgumentException] {
+      KafkaTokenUtil.getTicketCacheJaasParams(sparkConf)
+    }
+
+    assert(thrown.getMessage contains "Kerberos service name must be defined")
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0166c737/external/kafka-0-10-sql/pom.xml
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/pom.xml b/external/kafka-0-10-sql/pom.xml
index 1af4071..de8731c 100644
--- a/external/kafka-0-10-sql/pom.xml
+++ b/external/kafka-0-10-sql/pom.xml
@@ -29,8 +29,6 @@
   <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
   <properties>
     <sbt.project.name>sql-kafka-0-10</sbt.project.name>
-    <!-- note that this should be compatible with Kafka brokers version 0.10 
and up -->
-    <kafka.version>2.1.0</kafka.version>
   </properties>
   <packaging>jar</packaging>
   <name>Kafka 0.10+ Source for Structured Streaming</name>

http://git-wip-us.apache.org/repos/asf/spark/blob/0166c737/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala
new file mode 100644
index 0000000..74d5ef9
--- /dev/null
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import org.apache.kafka.common.security.scram.ScramLoginModule
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.security.KafkaTokenUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[kafka010] object KafkaSecurityHelper extends Logging {
+  def isTokenAvailable(): Boolean = {
+    UserGroupInformation.getCurrentUser().getCredentials.getToken(
+      KafkaTokenUtil.TOKEN_SERVICE) != null
+  }
+
+  def getTokenJaasParams(sparkConf: SparkConf): String = {
+    val token = UserGroupInformation.getCurrentUser().getCredentials.getToken(
+      KafkaTokenUtil.TOKEN_SERVICE)
+    val serviceName = sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)
+    require(serviceName.isDefined, "Kerberos service name must be defined")
+    val username = new String(token.getIdentifier)
+    val password = new String(token.getPassword)
+
+    val loginModuleName = classOf[ScramLoginModule].getName
+    val params =
+      s"""
+      |$loginModuleName required
+      | tokenauth=true
+      | serviceName="${serviceName.get}"
+      | username="$username"
+      | password="$password";
+      """.stripMargin.replace("\n", "")
+    logDebug(s"Scram JAAS params: ${params.replaceAll("password=\".*\"", 
"password=\"[hidden]\"")}")
+
+    params
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0166c737/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index f770f0c..0ac3304 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -18,16 +18,19 @@
 package org.apache.spark.sql.kafka010
 
 import java.{util => ju}
-import java.util.{Locale, Optional, UUID}
+import java.util.{Locale, UUID}
 
 import scala.collection.JavaConverters._
 
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, 
ByteArraySerializer}
 
+import org.apache.spark.SparkEnv
+import org.apache.spark.deploy.security.KafkaTokenUtil
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, 
SparkSession, SQLContext}
+import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, 
SQLContext}
 import org.apache.spark.sql.execution.streaming.{Sink, Source}
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.sources.v2._
@@ -80,12 +83,7 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
     val uniqueGroupId = streamingUniqueGroupId(parameters, metadataPath)
 
     val caseInsensitiveParams = parameters.map { case (k, v) => 
(k.toLowerCase(Locale.ROOT), v) }
-    val specifiedKafkaParams =
-      parameters
-        .keySet
-        .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
-        .map { k => k.drop(6).toString -> parameters(k) }
-        .toMap
+    val specifiedKafkaParams = convertToSpecifiedParams(parameters)
 
     val startingStreamOffsets = 
KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams,
       STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
@@ -122,12 +120,7 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
     val uniqueGroupId = streamingUniqueGroupId(parameters, metadataPath)
 
     val caseInsensitiveParams = parameters.map { case (k, v) => 
(k.toLowerCase(Locale.ROOT), v) }
-    val specifiedKafkaParams =
-      parameters
-        .keySet
-        .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
-        .map { k => k.drop(6).toString -> parameters(k) }
-        .toMap
+    val specifiedKafkaParams = convertToSpecifiedParams(parameters)
 
     val startingStreamOffsets = 
KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams,
       STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
@@ -198,12 +191,7 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
       parameters: Map[String, String]): BaseRelation = {
     validateBatchOptions(parameters)
     val caseInsensitiveParams = parameters.map { case (k, v) => 
(k.toLowerCase(Locale.ROOT), v) }
-    val specifiedKafkaParams =
-      parameters
-        .keySet
-        .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
-        .map { k => k.drop(6).toString -> parameters(k) }
-        .toMap
+    val specifiedKafkaParams = convertToSpecifiedParams(parameters)
 
     val startingRelationOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(
       caseInsensitiveParams, STARTING_OFFSETS_OPTION_KEY, 
EarliestOffsetRangeLimit)
@@ -230,8 +218,7 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
       outputMode: OutputMode): Sink = {
     val defaultTopic = parameters.get(TOPIC_OPTION_KEY).map(_.trim)
     val specifiedKafkaParams = kafkaParamsForProducer(parameters)
-    new KafkaSink(sqlContext,
-      new ju.HashMap[String, Object](specifiedKafkaParams.asJava), 
defaultTopic)
+    new KafkaSink(sqlContext, specifiedKafkaParams, defaultTopic)
   }
 
   override def createRelation(
@@ -248,8 +235,8 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
     }
     val topic = parameters.get(TOPIC_OPTION_KEY).map(_.trim)
     val specifiedKafkaParams = kafkaParamsForProducer(parameters)
-    KafkaWriter.write(outerSQLContext.sparkSession, data.queryExecution,
-      new ju.HashMap[String, Object](specifiedKafkaParams.asJava), topic)
+    KafkaWriter.write(outerSQLContext.sparkSession, data.queryExecution, 
specifiedKafkaParams,
+      topic)
 
     /* This method is suppose to return a relation that reads the data that 
was written.
      * We cannot support this for Kafka. Therefore, in order to make things 
consistent,
@@ -274,13 +261,11 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
       options: DataSourceOptions): StreamingWriteSupport = {
     import scala.collection.JavaConverters._
 
-    val spark = SparkSession.getActiveSession.get
     val topic = Option(options.get(TOPIC_OPTION_KEY).orElse(null)).map(_.trim)
     // We convert the options argument from V2 -> Java map -> scala mutable -> 
scala immutable.
     val producerParams = kafkaParamsForProducer(options.asMap.asScala.toMap)
 
-    KafkaWriter.validateQuery(
-      schema.toAttributes, new java.util.HashMap[String, 
Object](producerParams.asJava), topic)
+    KafkaWriter.validateQuery(schema.toAttributes, producerParams, topic)
 
     new KafkaStreamingWriteSupport(topic, producerParams, schema)
   }
@@ -481,6 +466,7 @@ private[kafka010] object KafkaSourceProvider extends 
Logging {
 
 
 
+  private val serClassName = classOf[ByteArraySerializer].getName
   private val deserClassName = classOf[ByteArrayDeserializer].getName
 
   def getKafkaOffsetRangeLimit(
@@ -515,6 +501,7 @@ private[kafka010] object KafkaSourceProvider extends 
Logging {
       // If buffer config is not set, set it to reasonable value to work around
       // buffer issues (see KAFKA-3135)
       .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: 
java.lang.Integer)
+      .setTokenJaasConfigIfNeeded()
       .build()
 
   def kafkaParamsForExecutors(
@@ -536,6 +523,7 @@ private[kafka010] object KafkaSourceProvider extends 
Logging {
       // If buffer config is not set, set it to reasonable value to work around
       // buffer issues (see KAFKA-3135)
       .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: 
java.lang.Integer)
+      .setTokenJaasConfigIfNeeded()
       .build()
 
   /**
@@ -568,11 +556,32 @@ private[kafka010] object KafkaSourceProvider extends 
Logging {
       this
     }
 
+    def setTokenJaasConfigIfNeeded(): ConfigUpdater = {
+      // There are multiple possibilities to log in and applied in the 
following order:
+      // - JVM global security provided -> try to log in with JVM global 
security configuration
+      //   which can be configured for example with 
'java.security.auth.login.config'.
+      //   For this no additional parameter needed.
+      // - Token is provided -> try to log in with scram module using kafka's 
dynamic JAAS
+      //   configuration.
+      if (KafkaTokenUtil.isGlobalJaasConfigurationProvided) {
+        logDebug("JVM global security configuration detected, using it for 
login.")
+      } else if (KafkaSecurityHelper.isTokenAvailable()) {
+        logDebug("Delegation token detected, using it for login.")
+        val jaasParams = 
KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf)
+        val mechanism = kafkaParams
+          .getOrElse(SaslConfigs.SASL_MECHANISM, 
SaslConfigs.DEFAULT_SASL_MECHANISM)
+        require(mechanism.startsWith("SCRAM"),
+          "Delegation token works only with SCRAM mechanism.")
+        set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
+      }
+      this
+    }
+
     def build(): ju.Map[String, Object] = map
   }
 
   private[kafka010] def kafkaParamsForProducer(
-      parameters: Map[String, String]): Map[String, String] = {
+      parameters: Map[String, String]): ju.Map[String, Object] = {
     val caseInsensitiveParams = parameters.map { case (k, v) => 
(k.toLowerCase(Locale.ROOT), v) }
     if 
(caseInsensitiveParams.contains(s"kafka.${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}"))
 {
       throw new IllegalArgumentException(
@@ -580,17 +589,26 @@ private[kafka010] object KafkaSourceProvider extends 
Logging {
           + "are serialized with ByteArraySerializer.")
     }
 
-    if 
(caseInsensitiveParams.contains(s"kafka.${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}"))
-    {
+    if 
(caseInsensitiveParams.contains(s"kafka.${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}"))
 {
       throw new IllegalArgumentException(
         s"Kafka option '${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}' is 
not supported as "
           + "value are serialized with ByteArraySerializer.")
     }
+
+    val specifiedKafkaParams = convertToSpecifiedParams(parameters)
+
+    ConfigUpdater("executor", specifiedKafkaParams)
+      .set(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, serClassName)
+      .set(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serClassName)
+      .setTokenJaasConfigIfNeeded()
+      .build()
+  }
+
+  private def convertToSpecifiedParams(parameters: Map[String, String]): 
Map[String, String] = {
     parameters
       .keySet
       .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
       .map { k => k.drop(6).toString -> parameters(k) }
-      .toMap + (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> 
classOf[ByteArraySerializer].getName,
-      ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> 
classOf[ByteArraySerializer].getName)
+      .toMap
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0166c737/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala
index 927c56d..0d831c3 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.kafka010
 
-import scala.collection.JavaConverters._
+import java.{util => ju}
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -41,10 +41,12 @@ case object KafkaWriterCommitMessage extends 
WriterCommitMessage
  * @param schema The schema of the input data.
  */
 class KafkaStreamingWriteSupport(
-    topic: Option[String], producerParams: Map[String, String], schema: 
StructType)
+    topic: Option[String],
+    producerParams: ju.Map[String, Object],
+    schema: StructType)
   extends StreamingWriteSupport {
 
-  validateQuery(schema.toAttributes, producerParams.toMap[String, 
Object].asJava, topic)
+  validateQuery(schema.toAttributes, producerParams, topic)
 
   override def createStreamingWriterFactory(): KafkaStreamWriterFactory =
     KafkaStreamWriterFactory(topic, producerParams, schema)
@@ -62,7 +64,9 @@ class KafkaStreamingWriteSupport(
  * @param schema The schema of the input data.
  */
 case class KafkaStreamWriterFactory(
-    topic: Option[String], producerParams: Map[String, String], schema: 
StructType)
+    topic: Option[String],
+    producerParams: ju.Map[String, Object],
+    schema: StructType)
   extends StreamingDataWriterFactory {
 
   override def createWriter(
@@ -83,12 +87,12 @@ case class KafkaStreamWriterFactory(
  * @param inputSchema The attributes in the input data.
  */
 class KafkaStreamDataWriter(
-    targetTopic: Option[String], producerParams: Map[String, String], 
inputSchema: Seq[Attribute])
+    targetTopic: Option[String],
+    producerParams: ju.Map[String, Object],
+    inputSchema: Seq[Attribute])
   extends KafkaRowWriter(inputSchema, targetTopic) with 
DataWriter[InternalRow] {
-  import scala.collection.JavaConverters._
 
-  private lazy val producer = CachedKafkaProducer.getOrCreate(
-    new java.util.HashMap[String, Object](producerParams.asJava))
+  private lazy val producer = CachedKafkaProducer.getOrCreate(producerParams)
 
   def write(row: InternalRow): Unit = {
     checkForErrors()
@@ -112,7 +116,7 @@ class KafkaStreamDataWriter(
     if (producer != null) {
       producer.flush()
       checkForErrors()
-      CachedKafkaProducer.close(new java.util.HashMap[String, 
Object](producerParams.asJava))
+      CachedKafkaProducer.close(producerParams)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0166c737/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
index 3f6fcf6..b21037b 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
@@ -409,7 +409,7 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
     */
     val topic = newTopic()
     testUtils.createTopic(topic, 1)
-    val options = new java.util.HashMap[String, String]
+    val options = new java.util.HashMap[String, Object]
     options.put("bootstrap.servers", testUtils.brokerAddress)
     options.put("buffer.memory", "16384") // min buffer size
     options.put("block.on.buffer.full", "true")
@@ -417,7 +417,7 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
     options.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
classOf[ByteArraySerializer].getName)
     val inputSchema = Seq(AttributeReference("value", BinaryType)())
     val data = new Array[Byte](15000) // large value
-    val writeTask = new KafkaStreamDataWriter(Some(topic), 
options.asScala.toMap, inputSchema)
+    val writeTask = new KafkaStreamDataWriter(Some(topic), options, 
inputSchema)
     try {
       val fieldTypes: Array[DataType] = Array(BinaryType)
       val converter = UnsafeProjection.create(fieldTypes)

http://git-wip-us.apache.org/repos/asf/spark/blob/0166c737/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala
new file mode 100644
index 0000000..772fe46
--- /dev/null
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.UUID
+
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.apache.hadoop.security.token.Token
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.security.KafkaTokenUtil
+import 
org.apache.spark.deploy.security.KafkaTokenUtil.KafkaDelegationTokenIdentifier
+import org.apache.spark.internal.config._
+
+class KafkaSecurityHelperSuite extends SparkFunSuite with BeforeAndAfterEach {
+  private val keytab = "/path/to/keytab"
+  private val kerberosServiceName = "kafka"
+  private val principal = "u...@domain.com"
+  private val tokenId = "tokenId" + UUID.randomUUID().toString
+  private val tokenPassword = "tokenPassword" + UUID.randomUUID().toString
+
+  private var sparkConf: SparkConf = null
+
+  override def beforeEach(): Unit = {
+    super.beforeEach()
+    sparkConf = new SparkConf()
+  }
+
+  override def afterEach(): Unit = {
+    try {
+      resetUGI
+    } finally {
+      super.afterEach()
+    }
+  }
+
+  private def addTokenToUGI(): Unit = {
+    val token = new Token[KafkaDelegationTokenIdentifier](
+      tokenId.getBytes,
+      tokenPassword.getBytes,
+      KafkaTokenUtil.TOKEN_KIND,
+      KafkaTokenUtil.TOKEN_SERVICE
+    )
+    val creds = new Credentials()
+    creds.addToken(KafkaTokenUtil.TOKEN_SERVICE, token)
+    UserGroupInformation.getCurrentUser.addCredentials(creds)
+  }
+
+  private def resetUGI: Unit = {
+    UserGroupInformation.setLoginUser(null)
+  }
+
+  test("isTokenAvailable without token should return false") {
+    assert(!KafkaSecurityHelper.isTokenAvailable())
+  }
+
+  test("isTokenAvailable with token should return true") {
+    addTokenToUGI()
+
+    assert(KafkaSecurityHelper.isTokenAvailable())
+  }
+
+  test("getTokenJaasParams with token no service should throw exception") {
+    addTokenToUGI()
+
+    val thrown = intercept[IllegalArgumentException] {
+      KafkaSecurityHelper.getTokenJaasParams(sparkConf)
+    }
+
+    assert(thrown.getMessage contains "Kerberos service name must be defined")
+  }
+
+  test("getTokenJaasParams with token should return scram module") {
+    addTokenToUGI()
+    sparkConf.set(Kafka.KERBEROS_SERVICE_NAME, kerberosServiceName)
+
+    val jaasParams = KafkaSecurityHelper.getTokenJaasParams(sparkConf)
+
+    assert(jaasParams.contains("ScramLoginModule required"))
+    assert(jaasParams.contains("tokenauth=true"))
+    assert(jaasParams.contains(tokenId))
+    assert(jaasParams.contains(tokenPassword))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0166c737/external/kafka-0-10/pom.xml
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/pom.xml b/external/kafka-0-10/pom.xml
index ea18b7e..333572e 100644
--- a/external/kafka-0-10/pom.xml
+++ b/external/kafka-0-10/pom.xml
@@ -28,8 +28,6 @@
   <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
   <properties>
     <sbt.project.name>streaming-kafka-0-10</sbt.project.name>
-    <!-- note that this should be compatible with Kafka brokers version 0.10 
and up -->
-    <kafka.version>2.1.0</kafka.version>
   </properties>
   <packaging>jar</packaging>
   <name>Spark Integration for Kafka 0.10</name>

http://git-wip-us.apache.org/repos/asf/spark/blob/0166c737/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3ca2f73..dfc3c54 100644
--- a/pom.xml
+++ b/pom.xml
@@ -128,6 +128,8 @@
     <hive.version>1.2.1.spark2</hive.version>
     <!-- Version used for internal directory structure -->
     <hive.version.short>1.2.1</hive.version.short>
+    <!-- note that this should be compatible with Kafka brokers version 0.10 
and up -->
+    <kafka.version>2.1.0</kafka.version>
     <derby.version>10.12.1.1</derby.version>
     <parquet.version>1.10.0</parquet.version>
     <orc.version>1.5.3</orc.version>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to