spark git commit: [SPARK-9786] [STREAMING] [KAFKA] fix backpressure so it works with defa…

2015-08-25 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 2239a2036 - 88991dc4f


[SPARK-9786] [STREAMING] [KAFKA] fix backpressure so it works with defa…

…ult maxRatePerPartition setting of 0

Author: cody koeninger c...@koeninger.org

Closes #8413 from koeninger/backpressure-testing-master.

(cherry picked from commit d9c25dec87e6da7d66a47ff94e7eefa008081b9d)
Signed-off-by: Tathagata Das tathagata.das1...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/88991dc4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/88991dc4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/88991dc4

Branch: refs/heads/branch-1.5
Commit: 88991dc4f04b0c88466c6eab5ada43506c981341
Parents: 2239a20
Author: cody koeninger c...@koeninger.org
Authored: Mon Aug 24 23:26:14 2015 -0700
Committer: Tathagata Das tathagata.das1...@gmail.com
Committed: Mon Aug 24 23:26:27 2015 -0700

--
 .../spark/streaming/kafka/DirectKafkaInputDStream.scala | 9 +++--
 1 file changed, 7 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/88991dc4/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
--
diff --git 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index 8a17707..194 100644
--- 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -95,8 +95,13 @@ class DirectKafkaInputDStream[
 
 val effectiveRateLimitPerPartition = estimatedRateLimit
   .filter(_  0)
-  .map(limit = Math.min(maxRateLimitPerPartition, (limit / 
numPartitions)))
-  .getOrElse(maxRateLimitPerPartition)
+  .map { limit =
+if (maxRateLimitPerPartition  0) {
+  Math.min(maxRateLimitPerPartition, (limit / numPartitions))
+} else {
+  limit / numPartitions
+}
+  }.getOrElse(maxRateLimitPerPartition)
 
 if (effectiveRateLimitPerPartition  0) {
   val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 
1000


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-9786] [STREAMING] [KAFKA] fix backpressure so it works with defa…

2015-08-25 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 5175ca0c8 - d9c25dec8


[SPARK-9786] [STREAMING] [KAFKA] fix backpressure so it works with defa…

…ult maxRatePerPartition setting of 0

Author: cody koeninger c...@koeninger.org

Closes #8413 from koeninger/backpressure-testing-master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d9c25dec
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d9c25dec
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d9c25dec

Branch: refs/heads/master
Commit: d9c25dec87e6da7d66a47ff94e7eefa008081b9d
Parents: 5175ca0
Author: cody koeninger c...@koeninger.org
Authored: Mon Aug 24 23:26:14 2015 -0700
Committer: Tathagata Das tathagata.das1...@gmail.com
Committed: Mon Aug 24 23:26:14 2015 -0700

--
 .../spark/streaming/kafka/DirectKafkaInputDStream.scala | 9 +++--
 1 file changed, 7 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d9c25dec/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
--
diff --git 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index 8a17707..194 100644
--- 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -95,8 +95,13 @@ class DirectKafkaInputDStream[
 
 val effectiveRateLimitPerPartition = estimatedRateLimit
   .filter(_  0)
-  .map(limit = Math.min(maxRateLimitPerPartition, (limit / 
numPartitions)))
-  .getOrElse(maxRateLimitPerPartition)
+  .map { limit =
+if (maxRateLimitPerPartition  0) {
+  Math.min(maxRateLimitPerPartition, (limit / numPartitions))
+} else {
+  limit / numPartitions
+}
+  }.getOrElse(maxRateLimitPerPartition)
 
 if (effectiveRateLimitPerPartition  0) {
   val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 
1000


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org