Repository: spark Updated Branches: refs/heads/master 6959061f0 -> 03c27435a
[TEST][STREAMING] Fix flaky Kafka rate controlling test ## What changes were proposed in this pull request? The current test is incorrect, because - The expected number of messages does not take into account that the topic has 2 partitions, and rate is set per partition. - Also in some cases, the test ran out of data in Kafka while waiting for the right amount of data per batch. The PR - Reduces the number of partitions to 1 - Adds more data to Kafka - Runs with 0.5 second so that batches are created slowly ## How was this patch tested? Ran many times locally, going to run it many times in Jenkins (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Author: Tathagata Das <tathagata.das1...@gmail.com> Closes #14361 from tdas/kafka-rate-test-fix. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/03c27435 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/03c27435 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/03c27435 Branch: refs/heads/master Commit: 03c27435aee4e319abe290771ba96e69469109ac Parents: 6959061 Author: Tathagata Das <tathagata.das1...@gmail.com> Authored: Tue Jul 26 00:41:46 2016 -0700 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Tue Jul 26 00:41:46 2016 -0700 ---------------------------------------------------------------------- .../spark/streaming/kafka010/DirectKafkaStreamSuite.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/03c27435/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala index c9e15bc..b1d90b8 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala @@ -544,15 +544,14 @@ class DirectKafkaStreamSuite test("using rate controller") { val topic = "backpressure" - val topicPartitions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1)) - kafkaTestUtils.createTopic(topic, 2) + kafkaTestUtils.createTopic(topic, 1) val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest") val executorKafkaParams = new JHashMap[String, Object](kafkaParams) KafkaUtils.fixKafkaParams(executorKafkaParams) - val batchIntervalMilliseconds = 100 + val batchIntervalMilliseconds = 500 val estimator = new ConstantEstimator(100) - val messages = Map("foo" -> 200) + val messages = Map("foo" -> 5000) kafkaTestUtils.sendMessages(topic, messages) val sparkConf = new SparkConf() @@ -596,7 +595,7 @@ class DirectKafkaStreamSuite estimator.updateRate(rate) // Set a new rate. // Expect blocks of data equal to "rate", scaled by the interval length in secs. val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001) - eventually(timeout(5.seconds), interval(batchIntervalMilliseconds.milliseconds)) { + eventually(timeout(5.seconds), interval(10 milliseconds)) { // Assert that rate estimator values are used to determine maxMessagesPerPartition. // Funky "-" in message makes the complete assertion message read better. assert(collectedData.asScala.exists(_.size == expectedSize), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org