Repository: spark
Updated Branches:
  refs/heads/master 354e93618 -> 95efc895e


[SPARK-18588][SS][KAFKA] Create a new KafkaConsumer when error happens to fix 
the flaky test

## What changes were proposed in this pull request?

When KafkaSource fails on Kafka errors, we should create a new consumer to 
retry rather than using the existing broken one because it's possible that the 
broken one will fail again.

This PR also assigns a new group id to the new created consumer for a possible 
race condition:  the broken consumer cannot talk with the Kafka cluster in 
`close` but the new consumer can talk to Kafka cluster. I'm not sure if this 
will happen or not. Just for safety to avoid that the Kafka cluster thinks 
there are two consumers with the same group id in a short time window. (Note: 
CachedKafkaConsumer doesn't need this fix since `assign` never uses the group 
id.)

## How was this patch tested?

In 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/70370/console
 , it ran this flaky test 120 times and all passed.

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #16282 from zsxwing/kafka-fix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/95efc895
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/95efc895
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/95efc895

Branch: refs/heads/master
Commit: 95efc895e929701a605313b87ad0cd91edee2f81
Parents: 354e936
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Wed Dec 21 15:39:36 2016 -0800
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Wed Dec 21 15:39:36 2016 -0800

----------------------------------------------------------------------
 dev/sparktestsupport/modules.py                 |  3 +-
 .../apache/spark/sql/kafka010/KafkaSource.scala | 58 ++++++++++++++------
 .../sql/kafka010/KafkaSourceProvider.scala      | 21 +++----
 .../spark/sql/kafka010/KafkaSourceSuite.scala   |  2 +-
 4 files changed, 52 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/95efc895/dev/sparktestsupport/modules.py
----------------------------------------------------------------------
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 1a7cf9a..10ad1fe 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -245,7 +245,8 @@ streaming_kafka_0_10 = Module(
     name="streaming-kafka-0-10",
     dependencies=[streaming],
     source_file_regexes=[
-        "external/kafka-0-10",
+        # The ending "/" is necessary otherwise it will include "sql-kafka" 
codes
+        "external/kafka-0-10/",
         "external/kafka-0-10-assembly",
     ],
     sbt_test_goals=[

http://git-wip-us.apache.org/repos/asf/spark/blob/95efc895/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 92ee0ed..43b8d9d 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -24,7 +24,7 @@ import java.nio.charset.StandardCharsets
 import scala.collection.JavaConverters._
 import scala.util.control.NonFatal
 
-import org.apache.kafka.clients.consumer.{Consumer, KafkaConsumer, 
OffsetOutOfRangeException}
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, 
KafkaConsumer, OffsetOutOfRangeException}
 import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.common.TopicPartition
 
@@ -81,14 +81,16 @@ import org.apache.spark.util.UninterruptibleThread
  * To avoid this issue, you should make sure stopping the query before 
stopping the Kafka brokers
  * and not use wrong broker addresses.
  */
-private[kafka010] case class KafkaSource(
+private[kafka010] class KafkaSource(
     sqlContext: SQLContext,
     consumerStrategy: ConsumerStrategy,
+    driverKafkaParams: ju.Map[String, Object],
     executorKafkaParams: ju.Map[String, Object],
     sourceOptions: Map[String, String],
     metadataPath: String,
     startingOffsets: StartingOffsets,
-    failOnDataLoss: Boolean)
+    failOnDataLoss: Boolean,
+    driverGroupIdPrefix: String)
   extends Source with Logging {
 
   private val sc = sqlContext.sparkContext
@@ -107,11 +109,31 @@ private[kafka010] case class KafkaSource(
   private val maxOffsetsPerTrigger =
     sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong)
 
+  private var groupId: String = null
+
+  private var nextId = 0
+
+  private def nextGroupId(): String = {
+    groupId = driverGroupIdPrefix + "-" + nextId
+    nextId += 1
+    groupId
+  }
+
   /**
    * A KafkaConsumer used in the driver to query the latest Kafka offsets. 
This only queries the
    * offsets and never commits them.
    */
-  private val consumer = consumerStrategy.createConsumer()
+  private var consumer: Consumer[Array[Byte], Array[Byte]] = createConsumer()
+
+  /**
+   * Create a consumer using the new generated group id. We always use a new 
consumer to avoid
+   * just using a broken consumer to retry on Kafka errors, which likely will 
fail again.
+   */
+  private def createConsumer(): Consumer[Array[Byte], Array[Byte]] = 
synchronized {
+    val newKafkaParams = new ju.HashMap[String, Object](driverKafkaParams)
+    newKafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, nextGroupId())
+    consumerStrategy.createConsumer(newKafkaParams)
+  }
 
   /**
    * Lazily initialize `initialPartitionOffsets` to make sure that 
`KafkaConsumer.poll` is only
@@ -171,6 +193,11 @@ private[kafka010] case class KafkaSource(
     Some(KafkaSourceOffset(offsets))
   }
 
+  private def resetConsumer(): Unit = synchronized {
+    consumer.close()
+    consumer = createConsumer()
+  }
+
   /** Proportionally distribute limit number of offsets among topicpartitions 
*/
   private def rateLimit(
       limit: Long,
@@ -441,13 +468,12 @@ private[kafka010] case class KafkaSource(
               try {
                 result = Some(body)
               } catch {
-                case x: OffsetOutOfRangeException =>
-                  reportDataLoss(x.getMessage)
                 case NonFatal(e) =>
                   lastException = e
                   logWarning(s"Error in attempt $attempt getting Kafka 
offsets: ", e)
                   attempt += 1
                   Thread.sleep(offsetFetchAttemptIntervalMs)
+                  resetConsumer()
               }
             }
           case _ =>
@@ -511,12 +537,12 @@ private[kafka010] object KafkaSource {
   ))
 
   sealed trait ConsumerStrategy {
-    def createConsumer(): Consumer[Array[Byte], Array[Byte]]
+    def createConsumer(kafkaParams: ju.Map[String, Object]): 
Consumer[Array[Byte], Array[Byte]]
   }
 
-  case class AssignStrategy(partitions: Array[TopicPartition], kafkaParams: 
ju.Map[String, Object])
-    extends ConsumerStrategy {
-    override def createConsumer(): Consumer[Array[Byte], Array[Byte]] = {
+  case class AssignStrategy(partitions: Array[TopicPartition]) extends 
ConsumerStrategy {
+    override def createConsumer(
+        kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], 
Array[Byte]] = {
       val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams)
       consumer.assign(ju.Arrays.asList(partitions: _*))
       consumer
@@ -525,9 +551,9 @@ private[kafka010] object KafkaSource {
     override def toString: String = s"Assign[${partitions.mkString(", ")}]"
   }
 
-  case class SubscribeStrategy(topics: Seq[String], kafkaParams: 
ju.Map[String, Object])
-    extends ConsumerStrategy {
-    override def createConsumer(): Consumer[Array[Byte], Array[Byte]] = {
+  case class SubscribeStrategy(topics: Seq[String]) extends ConsumerStrategy {
+    override def createConsumer(
+        kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], 
Array[Byte]] = {
       val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams)
       consumer.subscribe(topics.asJava)
       consumer
@@ -536,10 +562,10 @@ private[kafka010] object KafkaSource {
     override def toString: String = s"Subscribe[${topics.mkString(", ")}]"
   }
 
-  case class SubscribePatternStrategy(
-    topicPattern: String, kafkaParams: ju.Map[String, Object])
+  case class SubscribePatternStrategy(topicPattern: String)
     extends ConsumerStrategy {
-    override def createConsumer(): Consumer[Array[Byte], Array[Byte]] = {
+    override def createConsumer(
+        kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], 
Array[Byte]] = {
       val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams)
       consumer.subscribe(
         ju.regex.Pattern.compile(topicPattern),

http://git-wip-us.apache.org/repos/asf/spark/blob/95efc895/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 585ced8..aa01238 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -85,14 +85,11 @@ private[kafka010] class KafkaSourceProvider extends 
StreamSourceProvider
         case None => LatestOffsets
       }
 
-    val kafkaParamsForStrategy =
+    val kafkaParamsForDriver =
       ConfigUpdater("source", specifiedKafkaParams)
         .set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName)
         .set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserClassName)
 
-        // So that consumers in Kafka source do not mess with any existing 
group id
-        .set(ConsumerConfig.GROUP_ID_CONFIG, s"$uniqueGroupId-driver")
-
         // Set to "earliest" to avoid exceptions. However, KafkaSource will 
fetch the initial
         // offsets by itself instead of counting on KafkaConsumer.
         .set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
@@ -129,17 +126,11 @@ private[kafka010] class KafkaSourceProvider extends 
StreamSourceProvider
 
     val strategy = caseInsensitiveParams.find(x => 
STRATEGY_OPTION_KEYS.contains(x._1)).get match {
       case ("assign", value) =>
-        AssignStrategy(
-          JsonUtils.partitions(value),
-          kafkaParamsForStrategy)
+        AssignStrategy(JsonUtils.partitions(value))
       case ("subscribe", value) =>
-        SubscribeStrategy(
-          value.split(",").map(_.trim()).filter(_.nonEmpty),
-          kafkaParamsForStrategy)
+        SubscribeStrategy(value.split(",").map(_.trim()).filter(_.nonEmpty))
       case ("subscribepattern", value) =>
-        SubscribePatternStrategy(
-          value.trim(),
-          kafkaParamsForStrategy)
+        SubscribePatternStrategy(value.trim())
       case _ =>
         // Should never reach here as we are already matching on
         // matched strategy names
@@ -152,11 +143,13 @@ private[kafka010] class KafkaSourceProvider extends 
StreamSourceProvider
     new KafkaSource(
       sqlContext,
       strategy,
+      kafkaParamsForDriver,
       kafkaParamsForExecutors,
       parameters,
       metadataPath,
       startingOffsets,
-      failOnDataLoss)
+      failOnDataLoss,
+      driverGroupIdPrefix = s"$uniqueGroupId-driver")
   }
 
   private def validateOptions(parameters: Map[String, String]): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/95efc895/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index 5d2779a..544fbc5 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -845,7 +845,7 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends 
StreamTest with Shared
     }
   }
 
-  ignore("stress test for failOnDataLoss=false") {
+  test("stress test for failOnDataLoss=false") {
     val reader = spark
       .readStream
       .format("kafka")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to