Repository: spark Updated Branches: refs/heads/branch-1.5 2239a2036 -> 88991dc4f
[SPARK-9786] [STREAMING] [KAFKA] fix backpressure so it works with defa⦠â¦ult maxRatePerPartition setting of 0 Author: cody koeninger <c...@koeninger.org> Closes #8413 from koeninger/backpressure-testing-master. (cherry picked from commit d9c25dec87e6da7d66a47ff94e7eefa008081b9d) Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/88991dc4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/88991dc4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/88991dc4 Branch: refs/heads/branch-1.5 Commit: 88991dc4f04b0c88466c6eab5ada43506c981341 Parents: 2239a20 Author: cody koeninger <c...@koeninger.org> Authored: Mon Aug 24 23:26:14 2015 -0700 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Mon Aug 24 23:26:27 2015 -0700 ---------------------------------------------------------------------- .../spark/streaming/kafka/DirectKafkaInputDStream.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/88991dc4/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala index 8a17707..1000094 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala @@ -95,8 +95,13 @@ class DirectKafkaInputDStream[ val effectiveRateLimitPerPartition = estimatedRateLimit .filter(_ > 0) - .map(limit => Math.min(maxRateLimitPerPartition, (limit / numPartitions))) - .getOrElse(maxRateLimitPerPartition) + .map { limit => + if (maxRateLimitPerPartition > 0) { + Math.min(maxRateLimitPerPartition, (limit / numPartitions)) + } else { + limit / numPartitions + } + }.getOrElse(maxRateLimitPerPartition) if (effectiveRateLimitPerPartition > 0) { val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org