Repository: spark
Updated Branches:
  refs/heads/master 5175ca0c8 -> d9c25dec8


[SPARK-9786] [STREAMING] [KAFKA] fix backpressure so it works with defa…

…ult maxRatePerPartition setting of 0

Author: cody koeninger <c...@koeninger.org>

Closes #8413 from koeninger/backpressure-testing-master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d9c25dec
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d9c25dec
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d9c25dec

Branch: refs/heads/master
Commit: d9c25dec87e6da7d66a47ff94e7eefa008081b9d
Parents: 5175ca0
Author: cody koeninger <c...@koeninger.org>
Authored: Mon Aug 24 23:26:14 2015 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Mon Aug 24 23:26:14 2015 -0700

----------------------------------------------------------------------
 .../spark/streaming/kafka/DirectKafkaInputDStream.scala     | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d9c25dec/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index 8a17707..1000094 100644
--- 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -95,8 +95,13 @@ class DirectKafkaInputDStream[
 
     val effectiveRateLimitPerPartition = estimatedRateLimit
       .filter(_ > 0)
-      .map(limit => Math.min(maxRateLimitPerPartition, (limit / 
numPartitions)))
-      .getOrElse(maxRateLimitPerPartition)
+      .map { limit =>
+        if (maxRateLimitPerPartition > 0) {
+          Math.min(maxRateLimitPerPartition, (limit / numPartitions))
+        } else {
+          limit / numPartitions
+        }
+      }.getOrElse(maxRateLimitPerPartition)
 
     if (effectiveRateLimitPerPartition > 0) {
       val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 
1000


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to