Repository: spark
Updated Branches:
  refs/heads/branch-1.5 2239a2036 -> 88991dc4f


[SPARK-9786] [STREAMING] [KAFKA] fix backpressure so it works with defa…

…ult maxRatePerPartition setting of 0

Author: cody koeninger <c...@koeninger.org>

Closes #8413 from koeninger/backpressure-testing-master.

(cherry picked from commit d9c25dec87e6da7d66a47ff94e7eefa008081b9d)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/88991dc4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/88991dc4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/88991dc4

Branch: refs/heads/branch-1.5
Commit: 88991dc4f04b0c88466c6eab5ada43506c981341
Parents: 2239a20
Author: cody koeninger <c...@koeninger.org>
Authored: Mon Aug 24 23:26:14 2015 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Mon Aug 24 23:26:27 2015 -0700

----------------------------------------------------------------------
 .../spark/streaming/kafka/DirectKafkaInputDStream.scala     | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/88991dc4/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index 8a17707..1000094 100644
--- 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -95,8 +95,13 @@ class DirectKafkaInputDStream[
 
     val effectiveRateLimitPerPartition = estimatedRateLimit
       .filter(_ > 0)
-      .map(limit => Math.min(maxRateLimitPerPartition, (limit / 
numPartitions)))
-      .getOrElse(maxRateLimitPerPartition)
+      .map { limit =>
+        if (maxRateLimitPerPartition > 0) {
+          Math.min(maxRateLimitPerPartition, (limit / numPartitions))
+        } else {
+          limit / numPartitions
+        }
+      }.getOrElse(maxRateLimitPerPartition)
 
     if (effectiveRateLimitPerPartition > 0) {
       val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 
1000


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to