[GitHub] spark issue #19431: [SPARK-18580] [DSTREAM][KAFKA] Add spark.streaming.backp...

2018-03-21 Thread akonopko
Github user akonopko commented on the issue:

https://github.com/apache/spark/pull/19431
  
@koeninger done! 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][external/...

2018-03-21 Thread akonopko
Github user akonopko commented on the issue:

https://github.com/apache/spark/pull/19431
  
@koeninger resolved the conflict


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-03-12 Thread akonopko
Github user akonopko commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r173719300
  
--- Diff: 
external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
 ---
@@ -456,6 +455,60 @@ class DirectKafkaStreamSuite
 ssc.stop()
   }
 
+  test("backpressure.initialRate should honor maxRatePerPartition") {
+backpressureTest(maxRatePerPartition = 1000, initialRate = 500, 
maxMessagesPerPartition = 250)
+  }
+
+  test("use backpressure.initialRate with backpressure") {
--- End diff --

Right, thank you. I will correct this


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-02-10 Thread akonopko
Github user akonopko commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r167395493
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
 ---
@@ -22,6 +22,7 @@ import java.lang.{ Long => JLong }
 import java.util.{ Arrays, HashMap => JHashMap, Map => JMap }
 import java.util.concurrent.ConcurrentLinkedQueue
 import java.util.concurrent.atomic.AtomicLong
+import java.util.UUID
--- End diff --

Ah, they were disabled for test files. Fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-02-10 Thread akonopko
Github user akonopko commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r167395482
  
--- Diff: 
external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
 ---
@@ -539,6 +456,58 @@ class DirectKafkaStreamSuite
 ssc.stop()
   }
 
+  test("backpressure.initialRate should honor maxRatePerPartition") {
+backpressureTest(maxRatePerPartition = 1000, initialRate = 500, 
maxMessagesPerPartition = 250)
+  }
+
+  test("use backpressure.initialRate with backpressure") {
+backpressureTest(maxRatePerPartition = 300, initialRate = 1000, 
maxMessagesPerPartition = 150)
+  }
+
+  private def backpressureTest(maxRatePerPartition: Int,
+   initialRate: Int,
--- End diff --

Fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-02-10 Thread akonopko
Github user akonopko commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r167395492
  
--- Diff: 
external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
 ---
@@ -21,6 +21,7 @@ import java.io.File
 import java.util.Arrays
 import java.util.concurrent.ConcurrentLinkedQueue
 import java.util.concurrent.atomic.AtomicLong
+import java.util.UUID
--- End diff --

Ah, they were disabled for test files. Fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-02-10 Thread akonopko
Github user akonopko commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r167395487
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
 ---
@@ -687,6 +618,51 @@ class DirectKafkaStreamSuite
 ssc.stop()
   }
 
+  test("backpressure.initialRate should honor maxRatePerPartition") {
+backpressureTest(maxRatePerPartition = 1000, initialRate = 500, 
maxMessagesPerPartition = 250)
+  }
+
+  test("use backpressure.initialRate with backpressure") {
+backpressureTest(maxRatePerPartition = 300, initialRate = 1000, 
maxMessagesPerPartition = 150)
+  }
+
+  private def backpressureTest(maxRatePerPartition: Int,
+   initialRate: Int,
--- End diff --

Fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-02-09 Thread akonopko
Github user akonopko commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r167242320
  
--- Diff: 
external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
 ---
@@ -387,6 +387,89 @@ class DirectKafkaStreamSuite
   Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic, 1) 
-> 10L))
   }
 
+  test("use backpressure.initialRate with backpressure") {
+val topic = "backpressureInitialRate"
+val topicPartitions = Set(TopicAndPartition(topic, 0))
+kafkaTestUtils.createTopic(topic, 1)
+val kafkaParams = Map(
+  "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
+  "auto.offset.reset" -> "smallest"
+)
+
+val sparkConf = new SparkConf()
+  // Safe, even with streaming, because we're using the direct API.
+  // Using 1 core is useful to make the test more predictable.
+  .setMaster("local[1]")
+  .setAppName(this.getClass.getSimpleName)
+  .set("spark.streaming.backpressure.enabled", "true")
+  .set("spark.streaming.backpressure.initialRate", "500")
+
+val messages = Map("foo" -> 5000)
+kafkaTestUtils.sendMessages(topic, messages)
+
+ssc = new StreamingContext(sparkConf, Milliseconds(500))
+
+val kafkaStream = withClue("Error creating direct stream") {
+  val kc = new KafkaCluster(kafkaParams)
+  val messageHandler = (mmd: MessageAndMetadata[String, String]) => 
(mmd.key, mmd.message)
+  val m = kc.getEarliestLeaderOffsets(topicPartitions)
+.fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo 
=> lo.offset))
+
+  new DirectKafkaInputDStream[String, String, StringDecoder, 
StringDecoder, (String, String)](
+ssc, kafkaParams, m, messageHandler)
+}
+kafkaStream.start()
+
+val input = Map(new TopicAndPartition(topic, 0) -> 1000L)
+
+
assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains(
--- End diff --

Fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-02-09 Thread akonopko
Github user akonopko commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r167242353
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
 ---
@@ -551,6 +551,76 @@ class DirectKafkaStreamSuite
   Map(new TopicPartition(topic, 0) -> 5L, new TopicPartition(topic, 1) 
-> 10L))
   }
 
+  test("use backpressure.initialRate with backpressure") {
+val topic = "backpressureInitialRate"
+val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
+val sparkConf = new SparkConf()
+  // Safe, even with streaming, because we're using the direct API.
+  // Using 1 core is useful to make the test more predictable.
+  .setMaster("local[1]")
+  .setAppName(this.getClass.getSimpleName)
+  .set("spark.streaming.backpressure.enabled", "true")
+  .set("spark.streaming.kafka.maxRatePerPartition", "1000")
+  .set("spark.streaming.backpressure.initialRate", "500")
+
+val messages = Map("foo" -> 5000)
+kafkaTestUtils.sendMessages(topic, messages)
+
+ssc = new StreamingContext(sparkConf, Milliseconds(500))
+
+val kafkaStream = withClue("Error creating direct stream") {
+  new DirectKafkaInputDStream[String, String](
+ssc,
+preferredHosts,
+ConsumerStrategies.Subscribe[String, String](List(topic), 
kafkaParams.asScala),
+new DefaultPerPartitionConfig(sparkConf)
+  )
+}
+kafkaStream.start()
+
+val input = Map(new TopicPartition(topic, 0) -> 1000L)
+
+
assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains(
--- End diff --

Fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-02-09 Thread akonopko
Github user akonopko commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r167242329
  
--- Diff: 
external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
 ---
@@ -387,6 +387,89 @@ class DirectKafkaStreamSuite
   Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic, 1) 
-> 10L))
   }
 
+  test("use backpressure.initialRate with backpressure") {
+val topic = "backpressureInitialRate"
+val topicPartitions = Set(TopicAndPartition(topic, 0))
+kafkaTestUtils.createTopic(topic, 1)
+val kafkaParams = Map(
+  "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
+  "auto.offset.reset" -> "smallest"
+)
+
+val sparkConf = new SparkConf()
+  // Safe, even with streaming, because we're using the direct API.
+  // Using 1 core is useful to make the test more predictable.
+  .setMaster("local[1]")
+  .setAppName(this.getClass.getSimpleName)
+  .set("spark.streaming.backpressure.enabled", "true")
+  .set("spark.streaming.backpressure.initialRate", "500")
+
+val messages = Map("foo" -> 5000)
+kafkaTestUtils.sendMessages(topic, messages)
+
+ssc = new StreamingContext(sparkConf, Milliseconds(500))
+
+val kafkaStream = withClue("Error creating direct stream") {
+  val kc = new KafkaCluster(kafkaParams)
+  val messageHandler = (mmd: MessageAndMetadata[String, String]) => 
(mmd.key, mmd.message)
+  val m = kc.getEarliestLeaderOffsets(topicPartitions)
+.fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo 
=> lo.offset))
+
+  new DirectKafkaInputDStream[String, String, StringDecoder, 
StringDecoder, (String, String)](
+ssc, kafkaParams, m, messageHandler)
+}
+kafkaStream.start()
+
+val input = Map(new TopicAndPartition(topic, 0) -> 1000L)
+
+
assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains(
+  new TopicAndPartition(topic, 0) -> 250)) // we run for half a second
+
+kafkaStream.stop()
+  }
+
+  test("backpressure.initialRate should honor maxRatePerPartition") {
+val topic = "backpressureInitialRate2"
+val topicPartitions = Set(TopicAndPartition(topic, 0))
+kafkaTestUtils.createTopic(topic, 1)
+val kafkaParams = Map(
+  "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
+  "auto.offset.reset" -> "smallest"
+)
+
+val sparkConf = new SparkConf()
+  // Safe, even with streaming, because we're using the direct API.
+  // Using 1 core is useful to make the test more predictable.
+  .setMaster("local[1]")
+  .setAppName(this.getClass.getSimpleName)
+  .set("spark.streaming.backpressure.enabled", "true")
+  .set("spark.streaming.kafka.maxRatePerPartition", "300")
+  .set("spark.streaming.backpressure.initialRate", "1000")
+
+val messages = Map("foo" -> 5000)
+kafkaTestUtils.sendMessages(topic, messages)
+
+ssc = new StreamingContext(sparkConf, Milliseconds(500))
+
+val kafkaStream = withClue("Error creating direct stream") {
+  val kc = new KafkaCluster(kafkaParams)
+  val messageHandler = (mmd: MessageAndMetadata[String, String]) => 
(mmd.key, mmd.message)
+  val m = kc.getEarliestLeaderOffsets(topicPartitions)
+.fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo 
=> lo.offset))
+
+  new DirectKafkaInputDStream[String, String, StringDecoder, 
StringDecoder, (String, String)](
+ssc, kafkaParams, m, messageHandler)
+}
+kafkaStream.start()
+
+val input = Map(new TopicAndPartition(topic, 0) -> 1000L)
+
+
assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains(
--- End diff --

Fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-02-09 Thread akonopko
Github user akonopko commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r167242186
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
 ---
@@ -551,6 +551,76 @@ class DirectKafkaStreamSuite
   Map(new TopicPartition(topic, 0) -> 5L, new TopicPartition(topic, 1) 
-> 10L))
   }
 
+  test("use backpressure.initialRate with backpressure") {
+val topic = "backpressureInitialRate"
+val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
+val sparkConf = new SparkConf()
+  // Safe, even with streaming, because we're using the direct API.
+  // Using 1 core is useful to make the test more predictable.
+  .setMaster("local[1]")
+  .setAppName(this.getClass.getSimpleName)
+  .set("spark.streaming.backpressure.enabled", "true")
+  .set("spark.streaming.kafka.maxRatePerPartition", "1000")
+  .set("spark.streaming.backpressure.initialRate", "500")
+
+val messages = Map("foo" -> 5000)
+kafkaTestUtils.sendMessages(topic, messages)
+
+ssc = new StreamingContext(sparkConf, Milliseconds(500))
+
+val kafkaStream = withClue("Error creating direct stream") {
+  new DirectKafkaInputDStream[String, String](
+ssc,
+preferredHosts,
+ConsumerStrategies.Subscribe[String, String](List(topic), 
kafkaParams.asScala),
+new DefaultPerPartitionConfig(sparkConf)
+  )
+}
+kafkaStream.start()
+
+val input = Map(new TopicPartition(topic, 0) -> 1000L)
+
+
assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains(
+  new TopicPartition(topic, 0) -> 250)) // we run for half a second
+
+kafkaStream.stop()
+  }
+
+  test("backpressure.initialRate should honor maxRatePerPartition") {
+val topic = "backpressureInitialRate"
+val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
+val sparkConf = new SparkConf()
+  // Safe, even with streaming, because we're using the direct API.
+  // Using 1 core is useful to make the test more predictable.
+  .setMaster("local[1]")
+  .setAppName(this.getClass.getSimpleName)
+  .set("spark.streaming.backpressure.enabled", "true")
+  .set("spark.streaming.kafka.maxRatePerPartition", "300")
+  .set("spark.streaming.backpressure.initialRate", "1000")
+
+val messages = Map("foo" -> 5000)
+kafkaTestUtils.sendMessages(topic, messages)
+
+ssc = new StreamingContext(sparkConf, Milliseconds(500))
+
+val kafkaStream = withClue("Error creating direct stream") {
+  new DirectKafkaInputDStream[String, String](
+ssc,
+preferredHosts,
+ConsumerStrategies.Subscribe[String, String](List(topic), 
kafkaParams.asScala),
+new DefaultPerPartitionConfig(sparkConf)
+  )
+}
+kafkaStream.start()
+
+val input = Map(new TopicPartition(topic, 0) -> 1000L)
+
+
assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains(
--- End diff --

Fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][external/...

2018-02-08 Thread akonopko
Github user akonopko commented on the issue:

https://github.com/apache/spark/pull/19431
  
> Related the doc I thought it's kafka specific but it's not so fine like 
that
Yes, it was implemented only in Kafka Streams but doc doesnt limit usage of 
this parameter to Kafka
 
> good to merge the common functionalities
Not sure I understood you correctly here. You mean in tests ?




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][external/...

2018-02-07 Thread akonopko
Github user akonopko commented on the issue:

https://github.com/apache/spark/pull/19431
  
Latest rate means rate of previous batch. Is it possible that in alive 
system 0 events were processed? Only if there is no backlog and no new events 
came during last batch. Completely possible.

This happens during first ran. And this parameter should limit it during 
1st ran. Quote from docs:

This is the initial maximum receiving rate at which each receiver will 
receive data for the first batch when the backpressure mechanism is enabled.

If it happened during system run, for example there is no backlog and no 
new events came, we still need to limit system rate since with LatestRate = 0 
it results in no limit, causing danger of overflowing the system.
If somehow cluster was so heavily loaded with other processes that could 
process 0 events in Spark Streaming, this means that we might have huge backlog 
after that. Which mean without this fix system has big chance of overflowing


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-02-07 Thread akonopko
Github user akonopko commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r166606906
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 ---
@@ -126,7 +129,10 @@ private[spark] class DirectKafkaInputDStream[K, V](
 
   protected[streaming] def maxMessagesPerPartition(
 offsets: Map[TopicPartition, Long]): Option[Map[TopicPartition, Long]] 
= {
-val estimatedRateLimit = rateController.map(_.getLatestRate())
+val estimatedRateLimit = rateController.map(x => {
+  val lr = x.getLatestRate()
+  if (lr > 0) lr else initialRate
--- End diff --

If somehow cluster was so heavily loaded with other processes that could 
process 0 events in Spark Streaming, this means that we might have huge backlog 
after that. Which mean without this fix system has big chance of overflowing


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-02-07 Thread akonopko
Github user akonopko commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r166605640
  
--- Diff: 
external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 ---
@@ -91,9 +91,16 @@ class DirectKafkaInputDStream[
   private val maxRateLimitPerPartition: Long = 
context.sparkContext.getConf.getLong(
   "spark.streaming.kafka.maxRatePerPartition", 0)
 
+  private val initialRate = context.sparkContext.getConf.getLong(
+"spark.streaming.backpressure.initialRate", 0)
+
   protected[streaming] def maxMessagesPerPartition(
   offsets: Map[TopicAndPartition, Long]): 
Option[Map[TopicAndPartition, Long]] = {
-val estimatedRateLimit = rateController.map(_.getLatestRate())
+
+val estimatedRateLimit = rateController.map(x => {
+  val lr = x.getLatestRate()
+  if (lr > 0) lr else initialRate
--- End diff --

Latest rate means rate of previous batch. Is it possible that in alive 
system 0 events were processed? Only if there is no backlog and no new events 
came during last batch. Completely possible.

This happens during first ran. And this parameter should limit it during 
1st ran. Quote from docs:

This is the initial maximum receiving rate at which each receiver will 
receive data for the first batch when the backpressure mechanism is enabled.

If it happened during system run, for example there is no backlog and no 
new events came, we still need to limit system rate since with LatestRate = 0 
it results in no limit, causing danger of overflowing the system


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-02-07 Thread akonopko
Github user akonopko commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r166605671
  
--- Diff: 
external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 ---
@@ -108,7 +115,9 @@ class DirectKafkaInputDStream[
   tp -> (if (maxRateLimitPerPartition > 0) {
 Math.min(backpressureRate, maxRateLimitPerPartition)} else 
backpressureRate)
 }
-  case None => offsets.map { case (tp, offset) => tp -> 
maxRateLimitPerPartition }
+  case None => offsets.map { case (tp, offset) => tp -> {
--- End diff --

Fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-02-07 Thread akonopko
Github user akonopko commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r166605578
  
--- Diff: 
external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 ---
@@ -91,9 +91,16 @@ class DirectKafkaInputDStream[
   private val maxRateLimitPerPartition: Long = 
context.sparkContext.getConf.getLong(
   "spark.streaming.kafka.maxRatePerPartition", 0)
 
+  private val initialRate = context.sparkContext.getConf.getLong(
+"spark.streaming.backpressure.initialRate", 0)
+
   protected[streaming] def maxMessagesPerPartition(
   offsets: Map[TopicAndPartition, Long]): 
Option[Map[TopicAndPartition, Long]] = {
-val estimatedRateLimit = rateController.map(_.getLatestRate())
+
+val estimatedRateLimit = rateController.map(x => {
--- End diff --

Fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-02-07 Thread akonopko
Github user akonopko commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r166605547
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 ---
@@ -126,7 +129,10 @@ private[spark] class DirectKafkaInputDStream[K, V](
 
   protected[streaming] def maxMessagesPerPartition(
 offsets: Map[TopicPartition, Long]): Option[Map[TopicPartition, Long]] 
= {
-val estimatedRateLimit = rateController.map(_.getLatestRate())
+val estimatedRateLimit = rateController.map(x => {
+  val lr = x.getLatestRate()
+  if (lr > 0) lr else initialRate
--- End diff --

Latest rate means rate of previous batch. Is it possible that in alive 
system 0 events were processed? Only if there is no backlog and no new events 
came during last batch. Completely possible. 

This happens during first ran. And this parameter should limit it during 
1st ran. Quote from docs:

`This is the initial maximum receiving rate at which each receiver will 
receive data for the
first batch when the backpressure mechanism is enabled.`

If it happened during system run, for example there is no backlog and no 
new events came, we still need to limit system rate since with LatestRate = 0 
it results in no limit, causing danger of overflowing the system. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-02-07 Thread akonopko
Github user akonopko commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r166603416
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 ---
@@ -126,7 +129,10 @@ private[spark] class DirectKafkaInputDStream[K, V](
 
   protected[streaming] def maxMessagesPerPartition(
 offsets: Map[TopicPartition, Long]): Option[Map[TopicPartition, Long]] 
= {
-val estimatedRateLimit = rateController.map(_.getLatestRate())
+val estimatedRateLimit = rateController.map(x => {
--- End diff --

Fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][external/...

2018-02-07 Thread akonopko
Github user akonopko commented on the issue:

https://github.com/apache/spark/pull/19431
  
@gaborgsomogyi 
`spark.streaming.backpressure.initialRate` is already documented in here: 
https://spark.apache.org/docs/latest/configuration.html
But was mistakenly not included to to direct Kafka Streams


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19430: Spark 18580

2017-10-04 Thread akonopko
Github user akonopko commented on the issue:

https://github.com/apache/spark/pull/19430
  
@vanzin plz take a look at https://github.com/apache/spark/pull/19431
I am deleting this PR cause it was mistakenly created against branch-2.2


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19430: Spark 18580

2017-10-04 Thread akonopko
Github user akonopko closed the pull request at:

https://github.com/apache/spark/pull/19430


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: Add spark.streaming.backpressure.initialRate to d...

2017-10-04 Thread akonopko
GitHub user akonopko opened a pull request:

https://github.com/apache/spark/pull/19431

Add spark.streaming.backpressure.initialRate to direct Kafka streams



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/akonopko/spark SPARK-18580-initialrate

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19431.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19431






---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19430: Spark 18580

2017-10-04 Thread akonopko
GitHub user akonopko reopened a pull request:

https://github.com/apache/spark/pull/19430

Spark 18580

## What changes were proposed in this pull request?

Add `spark.streaming.backpressure.initialRate` to direct Kafka Streams for 
Kafka 0.8 and 0.10
This is required in order to be able to use backpressure with huge lags, 
which cannot be processed at once. Without this parameter 
`DirectKafkaInputDStream` with backpressure enabled would try to get all the 
possible data from Kafka before adjusting consumption rate

## How was this patch tested?

- Tests added to 
`org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala` and 
`org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala`
- Manual tests on YARN cluster

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/akonopko/spark SPARK-18580

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19430.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19430






---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19430: Spark 18580

2017-10-04 Thread akonopko
Github user akonopko closed the pull request at:

https://github.com/apache/spark/pull/19430


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19430: Spark 18580

2017-10-04 Thread akonopko
GitHub user akonopko opened a pull request:

https://github.com/apache/spark/pull/19430

Spark 18580

## What changes were proposed in this pull request?

Add `spark.streaming.backpressure.initialRate` to direct Kafka Streams for 
Kafka 0.8 and 0.10
This is required in order to be able to use backpressure with huge lags, 
which cannot be processed at once. Without this parameter 
`DirectKafkaInputDStream` with backpressure enabled would try to get all the 
possible data from Kafka before adjusting consumption rate

## How was this patch tested?

- Tests added to 
`org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala` and 
`org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala`
- Manual tests on YARN cluster

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/akonopko/spark SPARK-18580

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19430.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19430






---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org