Repository: spark
Updated Branches:
  refs/heads/master 918c7e99a -> 2b89e4aa2


[SPARK-18580][DSTREAM][KAFKA] Add spark.streaming.backpressure.initialRate to 
direct Kafka streams

## What changes were proposed in this pull request?

Add `spark.streaming.backpressure.initialRate` to direct Kafka Streams for 
Kafka 0.8 and 0.10
This is required in order to be able to use backpressure with huge lags, which 
cannot be processed at once. Without this parameter `DirectKafkaInputDStream` 
with backpressure enabled would try to get all the possible data from Kafka 
before adjusting consumption rate

## How was this patch tested?

- Tests added to 
`org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala` and 
`org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala`
- Manual tests on YARN cluster

Author: akonopko <alex.kono...@teamaol.com>
Author: Alexander Konopko <alexander.kono...@gmail.com>

Closes #19431 from akonopko/SPARK-18580-initialrate.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2b89e4aa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2b89e4aa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2b89e4aa

Branch: refs/heads/master
Commit: 2b89e4aa2e8bd8b88f6e5eb60d95c1a58e5c4ace
Parents: 918c7e9
Author: akonopko <alex.kono...@teamaol.com>
Authored: Wed Mar 21 14:40:21 2018 -0500
Committer: cody koeninger <c...@koeninger.org>
Committed: Wed Mar 21 14:40:21 2018 -0500

----------------------------------------------------------------------
 .../kafka010/DirectKafkaInputDStream.scala      |  8 ++-
 .../kafka010/DirectKafkaStreamSuite.scala       | 51 ++++++++++++++++-
 .../kafka/DirectKafkaInputDStream.scala         |  9 ++-
 .../kafka/DirectKafkaStreamSuite.scala          | 59 +++++++++++++++++++-
 4 files changed, 120 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2b89e4aa/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
index 9cb2448..215b7ca 100644
--- 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
+++ 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
@@ -56,6 +56,9 @@ private[spark] class DirectKafkaInputDStream[K, V](
     ppc: PerPartitionConfig
   ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging with 
CanCommitOffsets {
 
+  private val initialRate = context.sparkContext.getConf.getLong(
+    "spark.streaming.backpressure.initialRate", 0)
+
   val executorKafkaParams = {
     val ekp = new ju.HashMap[String, 
Object](consumerStrategy.executorKafkaParams)
     KafkaUtils.fixKafkaParams(ekp)
@@ -126,7 +129,10 @@ private[spark] class DirectKafkaInputDStream[K, V](
 
   protected[streaming] def maxMessagesPerPartition(
     offsets: Map[TopicPartition, Long]): Option[Map[TopicPartition, Long]] = {
-    val estimatedRateLimit = rateController.map(_.getLatestRate())
+    val estimatedRateLimit = rateController.map { x => {
+      val lr = x.getLatestRate()
+      if (lr > 0) lr else initialRate
+    }}
 
     // calculate a per-partition rate limit based on current lag
     val effectiveRateLimitPerPartition = estimatedRateLimit.filter(_ > 0) 
match {

http://git-wip-us.apache.org/repos/asf/spark/blob/2b89e4aa/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
 
b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
index 8524743..35e4678 100644
--- 
a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
+++ 
b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.streaming.kafka010
 
 import java.io.File
 import java.lang.{ Long => JLong }
-import java.util.{ Arrays, HashMap => JHashMap, Map => JMap }
+import java.util.{ Arrays, HashMap => JHashMap, Map => JMap, UUID }
 import java.util.concurrent.ConcurrentLinkedQueue
 import java.util.concurrent.atomic.AtomicLong
 
@@ -34,7 +34,7 @@ import 
org.apache.kafka.common.serialization.StringDeserializer
 import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
 import org.scalatest.concurrent.Eventually
 
-import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time}
@@ -617,6 +617,53 @@ class DirectKafkaStreamSuite
     ssc.stop()
   }
 
+  test("backpressure.initialRate should honor maxRatePerPartition") {
+    backpressureTest(maxRatePerPartition = 1000, initialRate = 500, 
maxMessagesPerPartition = 250)
+  }
+
+  test("use backpressure.initialRate with backpressure") {
+    backpressureTest(maxRatePerPartition = 300, initialRate = 1000, 
maxMessagesPerPartition = 150)
+  }
+
+  private def backpressureTest(
+      maxRatePerPartition: Int,
+      initialRate: Int,
+      maxMessagesPerPartition: Int) = {
+
+    val topic = UUID.randomUUID().toString
+    val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
+    val sparkConf = new SparkConf()
+      // Safe, even with streaming, because we're using the direct API.
+      // Using 1 core is useful to make the test more predictable.
+      .setMaster("local[1]")
+      .setAppName(this.getClass.getSimpleName)
+      .set("spark.streaming.backpressure.enabled", "true")
+      .set("spark.streaming.backpressure.initialRate", initialRate.toString)
+      .set("spark.streaming.kafka.maxRatePerPartition", 
maxRatePerPartition.toString)
+
+    val messages = Map("foo" -> 5000)
+    kafkaTestUtils.sendMessages(topic, messages)
+
+    ssc = new StreamingContext(sparkConf, Milliseconds(500))
+
+    val kafkaStream = withClue("Error creating direct stream") {
+      new DirectKafkaInputDStream[String, String](
+        ssc,
+        preferredHosts,
+        ConsumerStrategies.Subscribe[String, String](List(topic), 
kafkaParams.asScala),
+        new DefaultPerPartitionConfig(sparkConf)
+      )
+    }
+    kafkaStream.start()
+
+    val input = Map(new TopicPartition(topic, 0) -> 1000L)
+
+    assert(kafkaStream.maxMessagesPerPartition(input).get ==
+      Map(new TopicPartition(topic, 0) -> maxMessagesPerPartition)) // we run 
for half a second
+
+    kafkaStream.stop()
+  }
+
   test("maxMessagesPerPartition with zero offset and rate equal to one") {
     val topic = "backpressure"
     val kafkaParams = getKafkaParams()

http://git-wip-us.apache.org/repos/asf/spark/blob/2b89e4aa/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index d6dd074..9297c39 100644
--- 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -91,9 +91,16 @@ class DirectKafkaInputDStream[
   private val maxRateLimitPerPartition: Long = 
context.sparkContext.getConf.getLong(
       "spark.streaming.kafka.maxRatePerPartition", 0)
 
+  private val initialRate = context.sparkContext.getConf.getLong(
+    "spark.streaming.backpressure.initialRate", 0)
+
   protected[streaming] def maxMessagesPerPartition(
       offsets: Map[TopicAndPartition, Long]): Option[Map[TopicAndPartition, 
Long]] = {
-    val estimatedRateLimit = rateController.map(_.getLatestRate())
+
+    val estimatedRateLimit = rateController.map { x => {
+      val lr = x.getLatestRate()
+      if (lr > 0) lr else initialRate
+    }}
 
     // calculate a per-partition rate limit based on current lag
     val effectiveRateLimitPerPartition = estimatedRateLimit.filter(_ > 0) 
match {

http://git-wip-us.apache.org/repos/asf/spark/blob/2b89e4aa/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
 
b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
index 3fea6cf..ecca387 100644
--- 
a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
+++ 
b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.streaming.kafka
 
 import java.io.File
-import java.util.Arrays
+import java.util.{ Arrays, UUID }
 import java.util.concurrent.ConcurrentLinkedQueue
 import java.util.concurrent.atomic.AtomicLong
 
@@ -32,12 +32,11 @@ import kafka.serializer.StringDecoder
 import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
 import org.scalatest.concurrent.Eventually
 
-import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time}
 import org.apache.spark.streaming.dstream.DStream
-import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
 import org.apache.spark.streaming.scheduler._
 import org.apache.spark.streaming.scheduler.rate.RateEstimator
 import org.apache.spark.util.Utils
@@ -456,6 +455,60 @@ class DirectKafkaStreamSuite
     ssc.stop()
   }
 
+  test("use backpressure.initialRate with backpressure") {
+    backpressureTest(maxRatePerPartition = 1000, initialRate = 500, 
maxMessagesPerPartition = 250)
+  }
+
+  test("backpressure.initialRate should honor maxRatePerPartition") {
+    backpressureTest(maxRatePerPartition = 300, initialRate = 1000, 
maxMessagesPerPartition = 150)
+  }
+
+  private def backpressureTest(
+      maxRatePerPartition: Int,
+      initialRate: Int,
+      maxMessagesPerPartition: Int) = {
+
+    val topic = UUID.randomUUID().toString
+    val topicPartitions = Set(TopicAndPartition(topic, 0))
+    kafkaTestUtils.createTopic(topic, 1)
+    val kafkaParams = Map(
+      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
+      "auto.offset.reset" -> "smallest"
+    )
+
+    val sparkConf = new SparkConf()
+      // Safe, even with streaming, because we're using the direct API.
+      // Using 1 core is useful to make the test more predictable.
+      .setMaster("local[1]")
+      .setAppName(this.getClass.getSimpleName)
+      .set("spark.streaming.backpressure.enabled", "true")
+      .set("spark.streaming.backpressure.initialRate", initialRate.toString)
+      .set("spark.streaming.kafka.maxRatePerPartition", 
maxRatePerPartition.toString)
+
+    val messages = Map("foo" -> 5000)
+    kafkaTestUtils.sendMessages(topic, messages)
+
+    ssc = new StreamingContext(sparkConf, Milliseconds(500))
+
+    val kafkaStream = withClue("Error creating direct stream") {
+      val kc = new KafkaCluster(kafkaParams)
+      val messageHandler = (mmd: MessageAndMetadata[String, String]) => 
(mmd.key, mmd.message)
+      val m = kc.getEarliestLeaderOffsets(topicPartitions)
+        .fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo => 
lo.offset))
+
+      new DirectKafkaInputDStream[String, String, StringDecoder, 
StringDecoder, (String, String)](
+        ssc, kafkaParams, m, messageHandler)
+    }
+    kafkaStream.start()
+
+    val input = Map(new TopicAndPartition(topic, 0) -> 1000L)
+
+    assert(kafkaStream.maxMessagesPerPartition(input).get ==
+      Map(new TopicAndPartition(topic, 0) -> maxMessagesPerPartition))
+
+    kafkaStream.stop()
+  }
+
   test("maxMessagesPerPartition with zero offset and rate equal to one") {
     val topic = "backpressure"
     val kafkaParams = Map(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to