spark git commit: [TEST][STREAMING] Fix flaky Kafka rate controlling test
Repository: spark Updated Branches: refs/heads/branch-2.0 4391d4a3c -> 44234b1c4 [TEST][STREAMING] Fix flaky Kafka rate controlling test ## What changes were proposed in this pull request? The current test is incorrect, because - The expected number of messages does not take into account that the topic has 2 partitions, and rate is set per partition. - Also in some cases, the test ran out of data in Kafka while waiting for the right amount of data per batch. The PR - Reduces the number of partitions to 1 - Adds more data to Kafka - Runs with 0.5 second so that batches are created slowly ## How was this patch tested? Ran many times locally, going to run it many times in Jenkins (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Author: Tathagata DasCloses #14361 from tdas/kafka-rate-test-fix. (cherry picked from commit 03c27435aee4e319abe290771ba96e69469109ac) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/44234b1c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/44234b1c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/44234b1c Branch: refs/heads/branch-2.0 Commit: 44234b1c4266ac7be56892817d043fe6d9ea62f7 Parents: 4391d4a Author: Tathagata Das Authored: Tue Jul 26 00:41:46 2016 -0700 Committer: Tathagata Das Committed: Tue Jul 26 00:41:58 2016 -0700 -- .../spark/streaming/kafka010/DirectKafkaStreamSuite.scala | 9 - 1 file changed, 4 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/44234b1c/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala -- diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala index c9e15bc..b1d90b8 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala @@ -544,15 +544,14 @@ class DirectKafkaStreamSuite test("using rate controller") { val topic = "backpressure" -val topicPartitions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1)) -kafkaTestUtils.createTopic(topic, 2) +kafkaTestUtils.createTopic(topic, 1) val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest") val executorKafkaParams = new JHashMap[String, Object](kafkaParams) KafkaUtils.fixKafkaParams(executorKafkaParams) -val batchIntervalMilliseconds = 100 +val batchIntervalMilliseconds = 500 val estimator = new ConstantEstimator(100) -val messages = Map("foo" -> 200) +val messages = Map("foo" -> 5000) kafkaTestUtils.sendMessages(topic, messages) val sparkConf = new SparkConf() @@ -596,7 +595,7 @@ class DirectKafkaStreamSuite estimator.updateRate(rate) // Set a new rate. // Expect blocks of data equal to "rate", scaled by the interval length in secs. val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001) - eventually(timeout(5.seconds), interval(batchIntervalMilliseconds.milliseconds)) { + eventually(timeout(5.seconds), interval(10 milliseconds)) { // Assert that rate estimator values are used to determine maxMessagesPerPartition. // Funky "-" in message makes the complete assertion message read better. assert(collectedData.asScala.exists(_.size == expectedSize), - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [TEST][STREAMING] Fix flaky Kafka rate controlling test
Repository: spark Updated Branches: refs/heads/master 6959061f0 -> 03c27435a [TEST][STREAMING] Fix flaky Kafka rate controlling test ## What changes were proposed in this pull request? The current test is incorrect, because - The expected number of messages does not take into account that the topic has 2 partitions, and rate is set per partition. - Also in some cases, the test ran out of data in Kafka while waiting for the right amount of data per batch. The PR - Reduces the number of partitions to 1 - Adds more data to Kafka - Runs with 0.5 second so that batches are created slowly ## How was this patch tested? Ran many times locally, going to run it many times in Jenkins (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Author: Tathagata DasCloses #14361 from tdas/kafka-rate-test-fix. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/03c27435 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/03c27435 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/03c27435 Branch: refs/heads/master Commit: 03c27435aee4e319abe290771ba96e69469109ac Parents: 6959061 Author: Tathagata Das Authored: Tue Jul 26 00:41:46 2016 -0700 Committer: Tathagata Das Committed: Tue Jul 26 00:41:46 2016 -0700 -- .../spark/streaming/kafka010/DirectKafkaStreamSuite.scala | 9 - 1 file changed, 4 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/03c27435/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala -- diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala index c9e15bc..b1d90b8 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala @@ -544,15 +544,14 @@ class DirectKafkaStreamSuite test("using rate controller") { val topic = "backpressure" -val topicPartitions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1)) -kafkaTestUtils.createTopic(topic, 2) +kafkaTestUtils.createTopic(topic, 1) val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest") val executorKafkaParams = new JHashMap[String, Object](kafkaParams) KafkaUtils.fixKafkaParams(executorKafkaParams) -val batchIntervalMilliseconds = 100 +val batchIntervalMilliseconds = 500 val estimator = new ConstantEstimator(100) -val messages = Map("foo" -> 200) +val messages = Map("foo" -> 5000) kafkaTestUtils.sendMessages(topic, messages) val sparkConf = new SparkConf() @@ -596,7 +595,7 @@ class DirectKafkaStreamSuite estimator.updateRate(rate) // Set a new rate. // Expect blocks of data equal to "rate", scaled by the interval length in secs. val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001) - eventually(timeout(5.seconds), interval(batchIntervalMilliseconds.milliseconds)) { + eventually(timeout(5.seconds), interval(10 milliseconds)) { // Assert that rate estimator values are used to determine maxMessagesPerPartition. // Funky "-" in message makes the complete assertion message read better. assert(collectedData.asScala.exists(_.size == expectedSize), - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org