spark git commit: [SPARK-9786] [STREAMING] [KAFKA] fix backpressure so it works with defa…
Repository: spark Updated Branches: refs/heads/branch-1.5 2239a2036 - 88991dc4f [SPARK-9786] [STREAMING] [KAFKA] fix backpressure so it works with defa⦠â¦ult maxRatePerPartition setting of 0 Author: cody koeninger c...@koeninger.org Closes #8413 from koeninger/backpressure-testing-master. (cherry picked from commit d9c25dec87e6da7d66a47ff94e7eefa008081b9d) Signed-off-by: Tathagata Das tathagata.das1...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/88991dc4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/88991dc4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/88991dc4 Branch: refs/heads/branch-1.5 Commit: 88991dc4f04b0c88466c6eab5ada43506c981341 Parents: 2239a20 Author: cody koeninger c...@koeninger.org Authored: Mon Aug 24 23:26:14 2015 -0700 Committer: Tathagata Das tathagata.das1...@gmail.com Committed: Mon Aug 24 23:26:27 2015 -0700 -- .../spark/streaming/kafka/DirectKafkaInputDStream.scala | 9 +++-- 1 file changed, 7 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/88991dc4/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala -- diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala index 8a17707..194 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala @@ -95,8 +95,13 @@ class DirectKafkaInputDStream[ val effectiveRateLimitPerPartition = estimatedRateLimit .filter(_ 0) - .map(limit = Math.min(maxRateLimitPerPartition, (limit / numPartitions))) - .getOrElse(maxRateLimitPerPartition) + .map { limit = +if (maxRateLimitPerPartition 0) { + Math.min(maxRateLimitPerPartition, (limit / numPartitions)) +} else { + limit / numPartitions +} + }.getOrElse(maxRateLimitPerPartition) if (effectiveRateLimitPerPartition 0) { val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-9786] [STREAMING] [KAFKA] fix backpressure so it works with defa…
Repository: spark Updated Branches: refs/heads/master 5175ca0c8 - d9c25dec8 [SPARK-9786] [STREAMING] [KAFKA] fix backpressure so it works with defa⦠â¦ult maxRatePerPartition setting of 0 Author: cody koeninger c...@koeninger.org Closes #8413 from koeninger/backpressure-testing-master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d9c25dec Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d9c25dec Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d9c25dec Branch: refs/heads/master Commit: d9c25dec87e6da7d66a47ff94e7eefa008081b9d Parents: 5175ca0 Author: cody koeninger c...@koeninger.org Authored: Mon Aug 24 23:26:14 2015 -0700 Committer: Tathagata Das tathagata.das1...@gmail.com Committed: Mon Aug 24 23:26:14 2015 -0700 -- .../spark/streaming/kafka/DirectKafkaInputDStream.scala | 9 +++-- 1 file changed, 7 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d9c25dec/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala -- diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala index 8a17707..194 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala @@ -95,8 +95,13 @@ class DirectKafkaInputDStream[ val effectiveRateLimitPerPartition = estimatedRateLimit .filter(_ 0) - .map(limit = Math.min(maxRateLimitPerPartition, (limit / numPartitions))) - .getOrElse(maxRateLimitPerPartition) + .map { limit = +if (maxRateLimitPerPartition 0) { + Math.min(maxRateLimitPerPartition, (limit / numPartitions)) +} else { + limit / numPartitions +} + }.getOrElse(maxRateLimitPerPartition) if (effectiveRateLimitPerPartition 0) { val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org