Repository: spark
Updated Branches:
  refs/heads/branch-2.0 6ef923137 -> f6b87939c


[SPARK-17841][STREAMING][KAFKA] drain commitQueue

## What changes were proposed in this pull request?

Actually drain commit queue rather than just iterating it.
iterator() on a concurrent linked queue won't remove items from the queue, 
poll() will.

## How was this patch tested?
Unit tests

Author: cody koeninger <c...@koeninger.org>

Closes #15407 from koeninger/SPARK-17841.

(cherry picked from commit cd106b050ff789b6de539956a7f01159ab15c820)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6b87939
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6b87939
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6b87939

Branch: refs/heads/branch-2.0
Commit: f6b87939cb90bf4a0996b3728c1bccdf5e24dd4e
Parents: 6ef9231
Author: cody koeninger <c...@koeninger.org>
Authored: Tue Oct 18 14:01:49 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Tue Oct 18 14:01:59 2016 -0700

----------------------------------------------------------------------
 .../spark/streaming/kafka010/DirectKafkaInputDStream.scala     | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f6b87939/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
index 432537e..7e57bb1 100644
--- 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
+++ 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
@@ -282,13 +282,13 @@ private[spark] class DirectKafkaInputDStream[K, V](
 
   protected def commitAll(): Unit = {
     val m = new ju.HashMap[TopicPartition, OffsetAndMetadata]()
-    val it = commitQueue.iterator()
-    while (it.hasNext) {
-      val osr = it.next
+    var osr = commitQueue.poll()
+    while (null != osr) {
       val tp = osr.topicPartition
       val x = m.get(tp)
       val offset = if (null == x) { osr.untilOffset } else { 
Math.max(x.offset, osr.untilOffset) }
       m.put(tp, new OffsetAndMetadata(offset))
+      osr = commitQueue.poll()
     }
     if (!m.isEmpty) {
       consumer.commitAsync(m, commitCallback.get)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to