This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-2.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.3 by this push: new 523818a [SPARK-28335][DSTREAMS][TEST] DirectKafkaStreamSuite wait for Kafka async commit 523818a is described below commit 523818a8ceda2b82e06089ee657e0bd92c4603e5 Author: Gabor Somogyi <gabor.g.somo...@gmail.com> AuthorDate: Wed Jul 10 09:35:39 2019 -0700 [SPARK-28335][DSTREAMS][TEST] DirectKafkaStreamSuite wait for Kafka async commit `DirectKafkaStreamSuite.offset recovery from kafka` commits offsets to Kafka with `Consumer.commitAsync` API (and then reads it back). Since this API is asynchronous it may send notifications late(or not at all). The actual test makes the assumption if the data sent and collected then the offset must be committed as well. This is not true. In this PR I've made the following modifications: * Wait for async offset commit before context stopped * Added commit succeed log to see whether it arrived at all * Using `ConcurrentHashMap` for committed offsets because 2 threads are using the variable (`JobGenerator` and `ScalaTest...`) Existing unit test in a loop + jenkins runs. Closes #25100 from gaborgsomogyi/SPARK-28335. Authored-by: Gabor Somogyi <gabor.g.somo...@gmail.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> (cherry picked from commit 579edf472822802285b5cd7d07f63503015eff5a) Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala index 453b5e5..375409c 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.streaming.kafka010 import java.io.File import java.lang.{ Long => JLong } import java.util.{ Arrays, HashMap => JHashMap, Map => JMap } +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicLong @@ -423,7 +424,7 @@ class DirectKafkaStreamSuite ) val collectedData = new ConcurrentLinkedQueue[String]() - val committed = new JHashMap[TopicPartition, OffsetAndMetadata]() + val committed = new ConcurrentHashMap[TopicPartition, OffsetAndMetadata]() // Send data to Kafka and wait for it to be received def sendDataAndWaitForReceive(data: Seq[Int]) { @@ -452,6 +453,7 @@ class DirectKafkaStreamSuite logError("commit failed", e) } else { committed.putAll(m) + logDebug(s"commit succeeded: $m") } } }) @@ -462,8 +464,10 @@ class DirectKafkaStreamSuite for (i <- (1 to 10).grouped(4)) { sendDataAndWaitForReceive(i) } + eventually(timeout(10.seconds), interval(50.milliseconds)) { + assert(!committed.isEmpty) + } ssc.stop() - assert(! committed.isEmpty) val consumer = new KafkaConsumer[String, String](kafkaParams) consumer.subscribe(Arrays.asList(topic)) consumer.poll(0) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org