Repository: spark
Updated Branches:
  refs/heads/branch-2.2 753e129f3 -> 3d53d825e


[SPARK-20452][SS][KAFKA] Fix a potential ConcurrentModificationException for 
batch Kafka DataFrame

## What changes were proposed in this pull request?

Cancel a batch Kafka query but one of task cannot be cancelled, and rerun the 
same DataFrame may cause ConcurrentModificationException because it may launch 
two tasks sharing the same group id.

This PR always create a new consumer when `reuseKafkaConsumer = false` to avoid 
ConcurrentModificationException. It also contains other minor fixes.

## How was this patch tested?

Jenkins.

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #17752 from zsxwing/kafka-fix.

(cherry picked from commit 823baca2cb8edb62885af547d3511c9e8923cefd)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3d53d825
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3d53d825
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3d53d825

Branch: refs/heads/branch-2.2
Commit: 3d53d825e8b3f5fad770398992d6c4f5341ce00e
Parents: 753e129
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Thu Apr 27 13:58:44 2017 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Thu Apr 27 13:58:54 2017 -0700

----------------------------------------------------------------------
 .../sql/kafka010/CachedKafkaConsumer.scala      |  12 +-
 .../spark/sql/kafka010/KafkaOffsetReader.scala  |   6 +-
 .../spark/sql/kafka010/KafkaRelation.scala      |  30 +++-
 .../sql/kafka010/KafkaSourceProvider.scala      | 147 +++++++++----------
 .../spark/sql/kafka010/KafkaSourceRDD.scala     |  19 ++-
 .../spark/streaming/kafka010/KafkaRDD.scala     |   2 +-
 6 files changed, 119 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3d53d825/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
index bf6c090..7c4f38e 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
@@ -287,7 +287,7 @@ private[kafka010] case class CachedKafkaConsumer private(
     reportDataLoss0(failOnDataLoss, finalMessage, cause)
   }
 
-  private def close(): Unit = consumer.close()
+  def close(): Unit = consumer.close()
 
   private def seek(offset: Long): Unit = {
     logDebug(s"Seeking to $groupId $topicPartition $offset")
@@ -382,7 +382,7 @@ private[kafka010] object CachedKafkaConsumer extends 
Logging {
 
     // If this is reattempt at running the task, then invalidate cache and 
start with
     // a new consumer
-    if (TaskContext.get != null && TaskContext.get.attemptNumber > 1) {
+    if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
       removeKafkaConsumer(topic, partition, kafkaParams)
       val consumer = new CachedKafkaConsumer(topicPartition, kafkaParams)
       consumer.inuse = true
@@ -398,6 +398,14 @@ private[kafka010] object CachedKafkaConsumer extends 
Logging {
     }
   }
 
+  /** Create an [[CachedKafkaConsumer]] but don't put it into cache. */
+  def createUncached(
+      topic: String,
+      partition: Int,
+      kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer = {
+    new CachedKafkaConsumer(new TopicPartition(topic, partition), kafkaParams)
+  }
+
   private def reportDataLoss0(
       failOnDataLoss: Boolean,
       finalMessage: String,

http://git-wip-us.apache.org/repos/asf/spark/blob/3d53d825/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
index 2696d6f..3e65949 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
@@ -95,8 +95,10 @@ private[kafka010] class KafkaOffsetReader(
    * Closes the connection to Kafka, and cleans up state.
    */
   def close(): Unit = {
-    consumer.close()
-    kafkaReaderThread.shutdownNow()
+    runUninterruptibly {
+      consumer.close()
+    }
+    kafkaReaderThread.shutdown()
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/3d53d825/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
index f180bba..97bd283 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.kafka010
 
 import java.{util => ju}
+import java.util.UUID
 
 import org.apache.kafka.common.TopicPartition
 
@@ -33,9 +34,9 @@ import org.apache.spark.unsafe.types.UTF8String
 
 private[kafka010] class KafkaRelation(
     override val sqlContext: SQLContext,
-    kafkaReader: KafkaOffsetReader,
-    executorKafkaParams: ju.Map[String, Object],
+    strategy: ConsumerStrategy,
     sourceOptions: Map[String, String],
+    specifiedKafkaParams: Map[String, String],
     failOnDataLoss: Boolean,
     startingOffsets: KafkaOffsetRangeLimit,
     endingOffsets: KafkaOffsetRangeLimit)
@@ -53,9 +54,27 @@ private[kafka010] class KafkaRelation(
   override def schema: StructType = KafkaOffsetReader.kafkaSchema
 
   override def buildScan(): RDD[Row] = {
+    // Each running query should use its own group id. Otherwise, the query 
may be only assigned
+    // partial data since Kafka will assign partitions to multiple consumers 
having the same group
+    // id. Hence, we should generate a unique id for each query.
+    val uniqueGroupId = s"spark-kafka-relation-${UUID.randomUUID}"
+
+    val kafkaOffsetReader = new KafkaOffsetReader(
+      strategy,
+      KafkaSourceProvider.kafkaParamsForDriver(specifiedKafkaParams),
+      sourceOptions,
+      driverGroupIdPrefix = s"$uniqueGroupId-driver")
+
     // Leverage the KafkaReader to obtain the relevant partition offsets
-    val fromPartitionOffsets = getPartitionOffsets(startingOffsets)
-    val untilPartitionOffsets = getPartitionOffsets(endingOffsets)
+    val (fromPartitionOffsets, untilPartitionOffsets) = {
+      try {
+        (getPartitionOffsets(kafkaOffsetReader, startingOffsets),
+          getPartitionOffsets(kafkaOffsetReader, endingOffsets))
+      } finally {
+        kafkaOffsetReader.close()
+      }
+    }
+
     // Obtain topicPartitions in both from and until partition offset, ignoring
     // topic partitions that were added and/or deleted between the two above 
calls.
     if (fromPartitionOffsets.keySet != untilPartitionOffsets.keySet) {
@@ -82,6 +101,8 @@ private[kafka010] class KafkaRelation(
       offsetRanges.sortBy(_.topicPartition.toString).mkString(", "))
 
     // Create an RDD that reads from Kafka and get the (key, value) pair as 
byte arrays.
+    val executorKafkaParams =
+      KafkaSourceProvider.kafkaParamsForExecutors(specifiedKafkaParams, 
uniqueGroupId)
     val rdd = new KafkaSourceRDD(
       sqlContext.sparkContext, executorKafkaParams, offsetRanges,
       pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer = false).map { cr =>
@@ -98,6 +119,7 @@ private[kafka010] class KafkaRelation(
   }
 
   private def getPartitionOffsets(
+      kafkaReader: KafkaOffsetReader,
       kafkaOffsets: KafkaOffsetRangeLimit): Map[TopicPartition, Long] = {
     def validateTopicPartitions(partitions: Set[TopicPartition],
       partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] 
= {

http://git-wip-us.apache.org/repos/asf/spark/blob/3d53d825/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index ab1ce34..3cb4d8c 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -111,10 +111,6 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
       sqlContext: SQLContext,
       parameters: Map[String, String]): BaseRelation = {
     validateBatchOptions(parameters)
-    // Each running query should use its own group id. Otherwise, the query 
may be only assigned
-    // partial data since Kafka will assign partitions to multiple consumers 
having the same group
-    // id. Hence, we should generate a unique id for each query.
-    val uniqueGroupId = s"spark-kafka-relation-${UUID.randomUUID}"
     val caseInsensitiveParams = parameters.map { case (k, v) => 
(k.toLowerCase(Locale.ROOT), v) }
     val specifiedKafkaParams =
       parameters
@@ -131,20 +127,14 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
       ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
     assert(endingRelationOffsets != EarliestOffsetRangeLimit)
 
-    val kafkaOffsetReader = new KafkaOffsetReader(
-      strategy(caseInsensitiveParams),
-      kafkaParamsForDriver(specifiedKafkaParams),
-      parameters,
-      driverGroupIdPrefix = s"$uniqueGroupId-driver")
-
     new KafkaRelation(
       sqlContext,
-      kafkaOffsetReader,
-      kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
-      parameters,
-      failOnDataLoss(caseInsensitiveParams),
-      startingRelationOffsets,
-      endingRelationOffsets)
+      strategy(caseInsensitiveParams),
+      sourceOptions = parameters,
+      specifiedKafkaParams = specifiedKafkaParams,
+      failOnDataLoss = failOnDataLoss(caseInsensitiveParams),
+      startingOffsets = startingRelationOffsets,
+      endingOffsets = endingRelationOffsets)
   }
 
   override def createSink(
@@ -213,46 +203,6 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
         ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> 
classOf[ByteArraySerializer].getName)
   }
 
-  private def kafkaParamsForDriver(specifiedKafkaParams: Map[String, String]) =
-    ConfigUpdater("source", specifiedKafkaParams)
-      .set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName)
-      .set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserClassName)
-
-      // Set to "earliest" to avoid exceptions. However, KafkaSource will 
fetch the initial
-      // offsets by itself instead of counting on KafkaConsumer.
-      .set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
-
-      // So that consumers in the driver does not commit offsets unnecessarily
-      .set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
-
-      // So that the driver does not pull too much data
-      .set(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, new java.lang.Integer(1))
-
-      // If buffer config is not set, set it to reasonable value to work around
-      // buffer issues (see KAFKA-3135)
-      .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: 
java.lang.Integer)
-      .build()
-
-  private def kafkaParamsForExecutors(
-      specifiedKafkaParams: Map[String, String], uniqueGroupId: String) =
-    ConfigUpdater("executor", specifiedKafkaParams)
-      .set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName)
-      .set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserClassName)
-
-      // Make sure executors do only what the driver tells them.
-      .set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
-
-      // So that consumers in executors do not mess with any existing group id
-      .set(ConsumerConfig.GROUP_ID_CONFIG, s"$uniqueGroupId-executor")
-
-      // So that consumers in executors does not commit offsets unnecessarily
-      .set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
-
-      // If buffer config is not set, set it to reasonable value to work around
-      // buffer issues (see KAFKA-3135)
-      .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: 
java.lang.Integer)
-      .build()
-
   private def strategy(caseInsensitiveParams: Map[String, String]) =
       caseInsensitiveParams.find(x => STRATEGY_OPTION_KEYS.contains(x._1)).get 
match {
     case ("assign", value) =>
@@ -414,30 +364,9 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
       logWarning("maxOffsetsPerTrigger option ignored in batch queries")
     }
   }
-
-  /** Class to conveniently update Kafka config params, while logging the 
changes */
-  private case class ConfigUpdater(module: String, kafkaParams: Map[String, 
String]) {
-    private val map = new ju.HashMap[String, Object](kafkaParams.asJava)
-
-    def set(key: String, value: Object): this.type = {
-      map.put(key, value)
-      logInfo(s"$module: Set $key to $value, earlier value: 
${kafkaParams.getOrElse(key, "")}")
-      this
-    }
-
-    def setIfUnset(key: String, value: Object): ConfigUpdater = {
-      if (!map.containsKey(key)) {
-        map.put(key, value)
-        logInfo(s"$module: Set $key to $value")
-      }
-      this
-    }
-
-    def build(): ju.Map[String, Object] = map
-  }
 }
 
-private[kafka010] object KafkaSourceProvider {
+private[kafka010] object KafkaSourceProvider extends Logging {
   private val STRATEGY_OPTION_KEYS = Set("subscribe", "subscribepattern", 
"assign")
   private[kafka010] val STARTING_OFFSETS_OPTION_KEY = "startingoffsets"
   private[kafka010] val ENDING_OFFSETS_OPTION_KEY = "endingoffsets"
@@ -459,4 +388,66 @@ private[kafka010] object KafkaSourceProvider {
       case None => defaultOffsets
     }
   }
+
+  def kafkaParamsForDriver(specifiedKafkaParams: Map[String, String]): 
ju.Map[String, Object] =
+    ConfigUpdater("source", specifiedKafkaParams)
+      .set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName)
+      .set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserClassName)
+
+      // Set to "earliest" to avoid exceptions. However, KafkaSource will 
fetch the initial
+      // offsets by itself instead of counting on KafkaConsumer.
+      .set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+
+      // So that consumers in the driver does not commit offsets unnecessarily
+      .set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+
+      // So that the driver does not pull too much data
+      .set(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, new java.lang.Integer(1))
+
+      // If buffer config is not set, set it to reasonable value to work around
+      // buffer issues (see KAFKA-3135)
+      .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: 
java.lang.Integer)
+      .build()
+
+  def kafkaParamsForExecutors(
+      specifiedKafkaParams: Map[String, String],
+      uniqueGroupId: String): ju.Map[String, Object] =
+    ConfigUpdater("executor", specifiedKafkaParams)
+      .set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName)
+      .set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserClassName)
+
+      // Make sure executors do only what the driver tells them.
+      .set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
+
+      // So that consumers in executors do not mess with any existing group id
+      .set(ConsumerConfig.GROUP_ID_CONFIG, s"$uniqueGroupId-executor")
+
+      // So that consumers in executors does not commit offsets unnecessarily
+      .set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+
+      // If buffer config is not set, set it to reasonable value to work around
+      // buffer issues (see KAFKA-3135)
+      .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: 
java.lang.Integer)
+      .build()
+
+  /** Class to conveniently update Kafka config params, while logging the 
changes */
+  private case class ConfigUpdater(module: String, kafkaParams: Map[String, 
String]) {
+    private val map = new ju.HashMap[String, Object](kafkaParams.asJava)
+
+    def set(key: String, value: Object): this.type = {
+      map.put(key, value)
+      logDebug(s"$module: Set $key to $value, earlier value: 
${kafkaParams.getOrElse(key, "")}")
+      this
+    }
+
+    def setIfUnset(key: String, value: Object): ConfigUpdater = {
+      if (!map.containsKey(key)) {
+        map.put(key, value)
+        logDebug(s"$module: Set $key to $value")
+      }
+      this
+    }
+
+    def build(): ju.Map[String, Object] = map
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3d53d825/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
index 6fb3473..9d9e2aa 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
@@ -125,16 +125,15 @@ private[kafka010] class KafkaSourceRDD(
       context: TaskContext): Iterator[ConsumerRecord[Array[Byte], 
Array[Byte]]] = {
     val sourcePartition = thePart.asInstanceOf[KafkaSourceRDDPartition]
     val topic = sourcePartition.offsetRange.topic
-    if (!reuseKafkaConsumer) {
-      // if we can't reuse CachedKafkaConsumers, let's reset the groupId to 
something unique
-      // to each task (i.e., append the task's unique partition id), because 
we will have
-      // multiple tasks (e.g., in the case of union) reading from the same 
topic partitions
-      val old = 
executorKafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
-      val id = TaskContext.getPartitionId()
-      executorKafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, old + "-" + id)
-    }
     val kafkaPartition = sourcePartition.offsetRange.partition
-    val consumer = CachedKafkaConsumer.getOrCreate(topic, kafkaPartition, 
executorKafkaParams)
+    val consumer =
+      if (!reuseKafkaConsumer) {
+        // If we can't reuse CachedKafkaConsumers, creating a new 
CachedKafkaConsumer. As here we
+        // uses `assign`, we don't need to worry about the "group.id" 
conflicts.
+        CachedKafkaConsumer.createUncached(topic, kafkaPartition, 
executorKafkaParams)
+      } else {
+        CachedKafkaConsumer.getOrCreate(topic, kafkaPartition, 
executorKafkaParams)
+      }
     val range = resolveRange(consumer, sourcePartition.offsetRange)
     assert(
       range.fromOffset <= range.untilOffset,
@@ -170,7 +169,7 @@ private[kafka010] class KafkaSourceRDD(
         override protected def close(): Unit = {
           if (!reuseKafkaConsumer) {
             // Don't forget to close non-reuse KafkaConsumers. You may take 
down your cluster!
-            CachedKafkaConsumer.removeKafkaConsumer(topic, kafkaPartition, 
executorKafkaParams)
+            consumer.close()
           } else {
             // Indicate that we're no longer using this consumer
             CachedKafkaConsumer.releaseKafkaConsumer(topic, kafkaPartition, 
executorKafkaParams)

http://git-wip-us.apache.org/repos/asf/spark/blob/3d53d825/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
index 4c6e2ce..62cdf5b 100644
--- 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
+++ 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
@@ -199,7 +199,7 @@ private[spark] class KafkaRDD[K, V](
 
     val consumer = if (useConsumerCache) {
       CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, 
cacheLoadFactor)
-      if (context.attemptNumber > 1) {
+      if (context.attemptNumber >= 1) {
         // just in case the prior attempt failures were cache related
         CachedKafkaConsumer.remove(groupId, part.topic, part.partition)
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to