Repository: spark
Updated Branches:
  refs/heads/master 6959061f0 -> 03c27435a


[TEST][STREAMING] Fix flaky Kafka rate controlling test

## What changes were proposed in this pull request?

The current test is incorrect, because
- The expected number of messages does not take into account that the topic has 
2 partitions, and rate is set per partition.
- Also in some cases, the test ran out of data in Kafka while waiting for the 
right amount of data per batch.

The PR
- Reduces the number of partitions to 1
- Adds more data to Kafka
- Runs with 0.5 second so that batches are created slowly

## How was this patch tested?
Ran many times locally, going to run it many times in Jenkins

(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #14361 from tdas/kafka-rate-test-fix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/03c27435
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/03c27435
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/03c27435

Branch: refs/heads/master
Commit: 03c27435aee4e319abe290771ba96e69469109ac
Parents: 6959061
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Tue Jul 26 00:41:46 2016 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Tue Jul 26 00:41:46 2016 -0700

----------------------------------------------------------------------
 .../spark/streaming/kafka010/DirectKafkaStreamSuite.scala   | 9 ++++-----
 1 file changed, 4 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/03c27435/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
 
b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
index c9e15bc..b1d90b8 100644
--- 
a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
+++ 
b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
@@ -544,15 +544,14 @@ class DirectKafkaStreamSuite
 
   test("using rate controller") {
     val topic = "backpressure"
-    val topicPartitions = Set(new TopicPartition(topic, 0), new 
TopicPartition(topic, 1))
-    kafkaTestUtils.createTopic(topic, 2)
+    kafkaTestUtils.createTopic(topic, 1)
     val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
     val executorKafkaParams = new JHashMap[String, Object](kafkaParams)
     KafkaUtils.fixKafkaParams(executorKafkaParams)
 
-    val batchIntervalMilliseconds = 100
+    val batchIntervalMilliseconds = 500
     val estimator = new ConstantEstimator(100)
-    val messages = Map("foo" -> 200)
+    val messages = Map("foo" -> 5000)
     kafkaTestUtils.sendMessages(topic, messages)
 
     val sparkConf = new SparkConf()
@@ -596,7 +595,7 @@ class DirectKafkaStreamSuite
       estimator.updateRate(rate)  // Set a new rate.
       // Expect blocks of data equal to "rate", scaled by the interval length 
in secs.
       val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001)
-      eventually(timeout(5.seconds), 
interval(batchIntervalMilliseconds.milliseconds)) {
+      eventually(timeout(5.seconds), interval(10 milliseconds)) {
         // Assert that rate estimator values are used to determine 
maxMessagesPerPartition.
         // Funky "-" in message makes the complete assertion message read 
better.
         assert(collectedData.asScala.exists(_.size == expectedSize),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to