Repository: spark
Updated Branches:
  refs/heads/branch-2.0 05ddc7517 -> 920162a1e


[SPARK-16212][STREAMING][KAFKA] apply test tweaks from 0-10 to 0-8 as well

## What changes were proposed in this pull request?

Bring the kafka-0-8 subproject up to date with some test modifications from 
development on 0-10.

Main changes are
- eliminating waits on concurrent queue in favor of an assert on received 
results,
- atomics instead of volatile (although this probably doesn't matter)
- increasing uniqueness of topic names

## How was this patch tested?

Unit tests

Author: cody koeninger <c...@koeninger.org>

Closes #14073 from koeninger/kafka-0-8-test-direct-cleanup.

(cherry picked from commit b8ebf63c1e1fa1ab53ea760fa293051c08ce5f59)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/920162a1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/920162a1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/920162a1

Branch: refs/heads/branch-2.0
Commit: 920162a1e0b43b558ba2242868a44cad06bef946
Parents: 05ddc75
Author: cody koeninger <c...@koeninger.org>
Authored: Wed Jul 6 16:21:41 2016 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Wed Jul 6 16:21:49 2016 -0700

----------------------------------------------------------------------
 .../kafka/DirectKafkaStreamSuite.scala          | 41 ++++++++++----------
 .../spark/streaming/kafka/KafkaRDDSuite.scala   |  8 ++--
 2 files changed, 24 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/920162a1/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
 
b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
index cb782d2..ab1c505 100644
--- 
a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
+++ 
b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -244,12 +244,9 @@ class DirectKafkaStreamSuite
     )
 
     // Send data to Kafka and wait for it to be received
-    def sendDataAndWaitForReceive(data: Seq[Int]) {
+    def sendData(data: Seq[Int]) {
       val strings = data.map { _.toString}
       kafkaTestUtils.sendMessages(topic, strings.map { _ -> 1}.toMap)
-      eventually(timeout(10 seconds), interval(50 milliseconds)) {
-        assert(strings.forall { DirectKafkaStreamSuite.collectedData.contains 
})
-      }
     }
 
     // Setup the streaming context
@@ -264,21 +261,21 @@ class DirectKafkaStreamSuite
     }
     ssc.checkpoint(testDir.getAbsolutePath)
 
-    // This is to collect the raw data received from Kafka
-    kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) =>
-      val data = rdd.map { _._2 }.collect()
-      DirectKafkaStreamSuite.collectedData.addAll(Arrays.asList(data: _*))
-    }
-
     // This is ensure all the data is eventually receiving only once
     stateStream.foreachRDD { (rdd: RDD[(String, Int)]) =>
-      rdd.collect().headOption.foreach { x => DirectKafkaStreamSuite.total = 
x._2 }
+      rdd.collect().headOption.foreach { x =>
+        DirectKafkaStreamSuite.total.set(x._2)
+      }
     }
     ssc.start()
 
-    // Send some data and wait for them to be received
+    // Send some data
     for (i <- (1 to 10).grouped(4)) {
-      sendDataAndWaitForReceive(i)
+      sendData(i)
+    }
+
+    eventually(timeout(10 seconds), interval(50 milliseconds)) {
+      assert(DirectKafkaStreamSuite.total.get === (1 to 10).sum)
     }
 
     ssc.stop()
@@ -302,23 +299,26 @@ class DirectKafkaStreamSuite
     val recoveredStream = 
ssc.graph.getInputStreams().head.asInstanceOf[DStream[(String, String)]]
 
     // Verify offset ranges have been recovered
-    val recoveredOffsetRanges = getOffsetRanges(recoveredStream)
+    val recoveredOffsetRanges = getOffsetRanges(recoveredStream).map { x => 
(x._1, x._2.toSet) }
     assert(recoveredOffsetRanges.size > 0, "No offset ranges recovered")
-    val earlierOffsetRangesAsSets = offsetRangesAfterStop.map { x => (x._1, 
x._2.toSet) }
+    val earlierOffsetRanges = offsetRangesAfterStop.map { x => (x._1, 
x._2.toSet) }
     assert(
       recoveredOffsetRanges.forall { or =>
-        earlierOffsetRangesAsSets.contains((or._1, or._2.toSet))
+        earlierOffsetRanges.contains((or._1, or._2))
       },
       "Recovered ranges are not the same as the ones generated\n" +
         s"recoveredOffsetRanges: $recoveredOffsetRanges\n" +
-        s"earlierOffsetRangesAsSets: $earlierOffsetRangesAsSets"
+        s"earlierOffsetRanges: $earlierOffsetRanges"
     )
     // Restart context, give more data and verify the total at the end
     // If the total is write that means each records has been received only 
once
     ssc.start()
-    sendDataAndWaitForReceive(11 to 20)
+    for (i <- (11 to 20).grouped(4)) {
+      sendData(i)
+    }
+
     eventually(timeout(10 seconds), interval(50 milliseconds)) {
-      assert(DirectKafkaStreamSuite.total === (1 to 20).sum)
+      assert(DirectKafkaStreamSuite.total.get === (1 to 20).sum)
     }
     ssc.stop()
   }
@@ -488,8 +488,7 @@ class DirectKafkaStreamSuite
 }
 
 object DirectKafkaStreamSuite {
-  val collectedData = new ConcurrentLinkedQueue[String]()
-  @volatile var total = -1L
+  val total = new AtomicLong(-1L)
 
   class InputInfoCollector extends StreamingListener {
     val numRecordsSubmitted = new AtomicLong(0L)

http://git-wip-us.apache.org/repos/asf/spark/blob/920162a1/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
 
b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
index 5e539c1..809699a 100644
--- 
a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
+++ 
b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
@@ -53,13 +53,13 @@ class KafkaRDDSuite extends SparkFunSuite with 
BeforeAndAfterAll {
   }
 
   test("basic usage") {
-    val topic = s"topicbasic-${Random.nextInt}"
+    val topic = s"topicbasic-${Random.nextInt}-${System.currentTimeMillis}"
     kafkaTestUtils.createTopic(topic)
     val messages = Array("the", "quick", "brown", "fox")
     kafkaTestUtils.sendMessages(topic, messages)
 
     val kafkaParams = Map("metadata.broker.list" -> 
kafkaTestUtils.brokerAddress,
-      "group.id" -> s"test-consumer-${Random.nextInt}")
+      "group.id" -> 
s"test-consumer-${Random.nextInt}-${System.currentTimeMillis}")
 
     val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size))
 
@@ -92,12 +92,12 @@ class KafkaRDDSuite extends SparkFunSuite with 
BeforeAndAfterAll {
 
   test("iterator boundary conditions") {
     // the idea is to find e.g. off-by-one errors between what kafka has 
available and the rdd
-    val topic = s"topicboundary-${Random.nextInt}"
+    val topic = s"topicboundary-${Random.nextInt}-${System.currentTimeMillis}"
     val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
     kafkaTestUtils.createTopic(topic)
 
     val kafkaParams = Map("metadata.broker.list" -> 
kafkaTestUtils.brokerAddress,
-      "group.id" -> s"test-consumer-${Random.nextInt}")
+      "group.id" -> 
s"test-consumer-${Random.nextInt}-${System.currentTimeMillis}")
 
     val kc = new KafkaCluster(kafkaParams)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to