Repository: spark
Updated Branches:
  refs/heads/master 9ce7d3e54 -> f9a56a153


[SPARK-17782][STREAMING][KAFKA] alternative eliminate race condition of poll 
twice

## What changes were proposed in this pull request?

Alternative approach to https://github.com/apache/spark/pull/15387

Author: cody koeninger <c...@koeninger.org>

Closes #15401 from koeninger/SPARK-17782-alt.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f9a56a15
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f9a56a15
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f9a56a15

Branch: refs/heads/master
Commit: f9a56a153e0579283160519065c7f3620d12da3e
Parents: 9ce7d3e
Author: cody koeninger <c...@koeninger.org>
Authored: Wed Oct 12 15:22:06 2016 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Wed Oct 12 15:22:06 2016 -0700

----------------------------------------------------------------------
 .../streaming/kafka010/ConsumerStrategy.scala   |  4 ++++
 .../kafka010/DirectKafkaInputDStream.scala      | 23 ++++++++++++++++++--
 .../kafka010/DirectKafkaStreamSuite.scala       | 12 +++++-----
 3 files changed, 32 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f9a56a15/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
index 60255fc..778c06e 100644
--- 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
+++ 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
@@ -104,6 +104,8 @@ private case class Subscribe[K, V](
       toSeek.asScala.foreach { case (topicPartition, offset) =>
           consumer.seek(topicPartition, offset)
       }
+      // we've called poll, we must pause or next poll may consume messages 
and set position
+      consumer.pause(consumer.assignment())
     }
 
     consumer
@@ -154,6 +156,8 @@ private case class SubscribePattern[K, V](
       toSeek.asScala.foreach { case (topicPartition, offset) =>
           consumer.seek(topicPartition, offset)
       }
+      // we've called poll, we must pause or next poll may consume messages 
and set position
+      consumer.pause(consumer.assignment())
     }
 
     consumer

http://git-wip-us.apache.org/repos/asf/spark/blob/f9a56a15/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
index 13827f6..432537e 100644
--- 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
+++ 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
@@ -162,11 +162,30 @@ private[spark] class DirectKafkaInputDStream[K, V](
   }
 
   /**
+   * The concern here is that poll might consume messages despite being paused,
+   * which would throw off consumer position.  Fix position if this happens.
+   */
+  private def paranoidPoll(c: Consumer[K, V]): Unit = {
+    val msgs = c.poll(0)
+    if (!msgs.isEmpty) {
+      // position should be minimum offset per topicpartition
+      msgs.asScala.foldLeft(Map[TopicPartition, Long]()) { (acc, m) =>
+        val tp = new TopicPartition(m.topic, m.partition)
+        val off = acc.get(tp).map(o => Math.min(o, 
m.offset)).getOrElse(m.offset)
+        acc + (tp -> off)
+      }.foreach { case (tp, off) =>
+          logInfo(s"poll(0) returned messages, seeking $tp to $off to 
compensate")
+          c.seek(tp, off)
+      }
+    }
+  }
+
+  /**
    * Returns the latest (highest) available offsets, taking new partitions 
into account.
    */
   protected def latestOffsets(): Map[TopicPartition, Long] = {
     val c = consumer
-    c.poll(0)
+    paranoidPoll(c)
     val parts = c.assignment().asScala
 
     // make sure new partitions are reflected in currentOffsets
@@ -223,7 +242,7 @@ private[spark] class DirectKafkaInputDStream[K, V](
 
   override def start(): Unit = {
     val c = consumer
-    c.poll(0)
+    paranoidPoll(c)
     if (currentOffsets.isEmpty) {
       currentOffsets = c.assignment().asScala.map { tp =>
         tp -> c.position(tp)

http://git-wip-us.apache.org/repos/asf/spark/blob/f9a56a15/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
 
b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
index e04f35e..02aec43 100644
--- 
a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
+++ 
b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
@@ -159,17 +159,19 @@ class DirectKafkaStreamSuite
   }
 
   test("pattern based subscription") {
-    val topics = List("pat1", "pat2", "advanced3")
-    // Should match 2 out of 3 topics
+    val topics = List("pat1", "pat2", "pat3", "advanced3")
+    // Should match 3 out of 4 topics
     val pat = """pat\d""".r.pattern
     val data = Map("a" -> 7, "b" -> 9)
     topics.foreach { t =>
       kafkaTestUtils.createTopic(t)
       kafkaTestUtils.sendMessages(t, data)
     }
-    val offsets = Map(new TopicPartition("pat2", 0) -> 3L)
-    // 2 matching topics, one of which starts 3 messages later
-    val expectedTotal = (data.values.sum * 2) - 3
+    val offsets = Map(
+      new TopicPartition("pat2", 0) -> 3L,
+      new TopicPartition("pat3", 0) -> 4L)
+    // 3 matching topics, two of which start a total of 7 messages later
+    val expectedTotal = (data.values.sum * 3) - 7
     val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
 
     ssc = new StreamingContext(sparkConf, Milliseconds(1000))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to