This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
     new 523818a  [SPARK-28335][DSTREAMS][TEST] DirectKafkaStreamSuite wait for 
Kafka async commit
523818a is described below

commit 523818a8ceda2b82e06089ee657e0bd92c4603e5
Author: Gabor Somogyi <gabor.g.somo...@gmail.com>
AuthorDate: Wed Jul 10 09:35:39 2019 -0700

    [SPARK-28335][DSTREAMS][TEST] DirectKafkaStreamSuite wait for Kafka async 
commit
    
    `DirectKafkaStreamSuite.offset recovery from kafka` commits offsets to 
Kafka with `Consumer.commitAsync` API (and then reads it back). Since this API 
is asynchronous it may send notifications late(or not at all). The actual test 
makes the assumption if the data sent and collected then the offset must be 
committed as well. This is not true.
    
    In this PR I've made the following modifications:
    * Wait for async offset commit before context stopped
    * Added commit succeed log to see whether it arrived at all
    * Using `ConcurrentHashMap` for committed offsets because 2 threads are 
using the variable (`JobGenerator` and `ScalaTest...`)
    
    Existing unit test in a loop + jenkins runs.
    
    Closes #25100 from gaborgsomogyi/SPARK-28335.
    
    Authored-by: Gabor Somogyi <gabor.g.somo...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
    (cherry picked from commit 579edf472822802285b5cd7d07f63503015eff5a)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala  | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git 
a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
 
b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
index 453b5e5..375409c 100644
--- 
a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
+++ 
b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.kafka010
 import java.io.File
 import java.lang.{ Long => JLong }
 import java.util.{ Arrays, HashMap => JHashMap, Map => JMap }
+import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.ConcurrentLinkedQueue
 import java.util.concurrent.atomic.AtomicLong
 
@@ -423,7 +424,7 @@ class DirectKafkaStreamSuite
     )
 
     val collectedData = new ConcurrentLinkedQueue[String]()
-    val committed = new JHashMap[TopicPartition, OffsetAndMetadata]()
+    val committed = new ConcurrentHashMap[TopicPartition, OffsetAndMetadata]()
 
     // Send data to Kafka and wait for it to be received
     def sendDataAndWaitForReceive(data: Seq[Int]) {
@@ -452,6 +453,7 @@ class DirectKafkaStreamSuite
                 logError("commit failed", e)
               } else {
                 committed.putAll(m)
+                logDebug(s"commit succeeded: $m")
               }
             }
           })
@@ -462,8 +464,10 @@ class DirectKafkaStreamSuite
     for (i <- (1 to 10).grouped(4)) {
       sendDataAndWaitForReceive(i)
     }
+    eventually(timeout(10.seconds), interval(50.milliseconds)) {
+      assert(!committed.isEmpty)
+    }
     ssc.stop()
-    assert(! committed.isEmpty)
     val consumer = new KafkaConsumer[String, String](kafkaParams)
     consumer.subscribe(Arrays.asList(topic))
     consumer.poll(0)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to