Repository: spark
Updated Branches:
  refs/heads/master a6e2bd31f -> f19228eed


[SPARK-12073][STREAMING] backpressure rate controller consumes events 
preferentially from lagg…

…ing partitions

I'm pretty sure this is the reason we couldn't easily recover from an 
unbalanced Kafka partition under heavy load when using backpressure.

`maxMessagesPerPartition` calculates an appropriate limit for the message rate 
from all partitions, and then divides by the number of partitions to determine 
how many messages to retrieve per partition. The problem with this approach is 
that when one partition is behind by millions of records (due to random Kafka 
issues), but the rate estimator calculates only 100k total messages can be 
retrieved, each partition (out of say 32) only retrieves max 100k/32=3125 
messages.

This PR (still needing a test) determines a per-partition desired message count 
by using the current lag for each partition to preferentially weight the total 
message limit among the partitions. In this situation, if each partition gets 
1k messages, but 1 partition starts 1M behind, then the total number of 
messages to retrieve is (32 * 1k + 1M) = 1032000 messages, of which the one 
partition needs 1001000. So, it gets (1001000 / 1032000) = 97% of the 100k 
messages, and the other 31 partitions share the remaining 3%.

Assuming all of 100k the messages are retrieved and processed within the batch 
window, the rate calculator will increase the number of messages to retrieve in 
the next batch, until it reaches a new stable point or the backlog is finished 
processed.

We're going to try deploying this internally at Shopify to see if this resolves 
our issue.

tdas koeninger holdenk

Author: Jason White <jason.wh...@shopify.com>

Closes #10089 from JasonMWhite/rate_controller_offsets.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f19228ee
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f19228ee
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f19228ee

Branch: refs/heads/master
Commit: f19228eed89cf8e22a07a7ef7f37a5f6f8a3d455
Parents: a6e2bd3
Author: Jason White <jason.wh...@shopify.com>
Authored: Fri Mar 4 16:04:56 2016 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Fri Mar 4 16:04:56 2016 -0800

----------------------------------------------------------------------
 .../kafka/DirectKafkaInputDStream.scala         | 44 ++++++++-----
 .../spark/streaming/kafka/KafkaTestUtils.scala  |  9 ++-
 .../kafka/JavaDirectKafkaStreamSuite.java       |  2 +-
 .../streaming/kafka/JavaKafkaRDDSuite.java      |  2 +-
 .../streaming/kafka/JavaKafkaStreamSuite.java   |  2 +-
 .../kafka/DirectKafkaStreamSuite.scala          | 68 +++++++++++++++++---
 project/MimaExcludes.scala                      |  4 ++
 7 files changed, 101 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f19228ee/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index 54d8c8b..0eaaf40 100644
--- 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -89,23 +89,32 @@ class DirectKafkaInputDStream[
 
   private val maxRateLimitPerPartition: Int = 
context.sparkContext.getConf.getInt(
       "spark.streaming.kafka.maxRatePerPartition", 0)
-  protected def maxMessagesPerPartition: Option[Long] = {
+
+  protected[streaming] def maxMessagesPerPartition(
+      offsets: Map[TopicAndPartition, Long]): Option[Map[TopicAndPartition, 
Long]] = {
     val estimatedRateLimit = rateController.map(_.getLatestRate().toInt)
-    val numPartitions = currentOffsets.keys.size
-
-    val effectiveRateLimitPerPartition = estimatedRateLimit
-      .filter(_ > 0)
-      .map { limit =>
-        if (maxRateLimitPerPartition > 0) {
-          Math.min(maxRateLimitPerPartition, (limit / numPartitions))
-        } else {
-          limit / numPartitions
+
+    // calculate a per-partition rate limit based on current lag
+    val effectiveRateLimitPerPartition = estimatedRateLimit.filter(_ > 0) 
match {
+      case Some(rate) =>
+        val lagPerPartition = offsets.map { case (tp, offset) =>
+          tp -> Math.max(offset - currentOffsets(tp), 0)
+        }
+        val totalLag = lagPerPartition.values.sum
+
+        lagPerPartition.map { case (tp, lag) =>
+          val backpressureRate = Math.round(lag / totalLag.toFloat * rate)
+          tp -> (if (maxRateLimitPerPartition > 0) {
+            Math.min(backpressureRate, maxRateLimitPerPartition)} else 
backpressureRate)
         }
-      }.getOrElse(maxRateLimitPerPartition)
+      case None => offsets.map { case (tp, offset) => tp -> 
maxRateLimitPerPartition }
+    }
 
-    if (effectiveRateLimitPerPartition > 0) {
+    if (effectiveRateLimitPerPartition.values.sum > 0) {
       val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 
1000
-      Some((secsPerBatch * effectiveRateLimitPerPartition).toLong)
+      Some(effectiveRateLimitPerPartition.map {
+        case (tp, limit) => tp -> (secsPerBatch * limit).toLong
+      })
     } else {
       None
     }
@@ -134,9 +143,12 @@ class DirectKafkaInputDStream[
   // limits the maximum number of messages per partition
   protected def clamp(
     leaderOffsets: Map[TopicAndPartition, LeaderOffset]): 
Map[TopicAndPartition, LeaderOffset] = {
-    maxMessagesPerPartition.map { mmp =>
-      leaderOffsets.map { case (tp, lo) =>
-        tp -> lo.copy(offset = Math.min(currentOffsets(tp) + mmp, lo.offset))
+    val offsets = leaderOffsets.mapValues(lo => lo.offset)
+
+    maxMessagesPerPartition(offsets).map { mmp =>
+      mmp.map { case (tp, messages) =>
+        val lo = leaderOffsets(tp)
+        tp -> lo.copy(offset = Math.min(currentOffsets(tp) + messages, 
lo.offset))
       }
     }.getOrElse(leaderOffsets)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/f19228ee/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
index a76fa66..a5ea1d6 100644
--- 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
+++ 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
@@ -152,12 +152,15 @@ private[kafka] class KafkaTestUtils extends Logging {
   }
 
   /** Create a Kafka topic and wait until it is propagated to the whole 
cluster */
-  def createTopic(topic: String): Unit = {
-    AdminUtils.createTopic(zkClient, topic, 1, 1)
+  def createTopic(topic: String, partitions: Int): Unit = {
+    AdminUtils.createTopic(zkClient, topic, partitions, 1)
     // wait until metadata is propagated
-    waitUntilMetadataIsPropagated(topic, 0)
+    (0 until partitions).foreach { p => waitUntilMetadataIsPropagated(topic, 
p) }
   }
 
+  /** Single-argument version for backwards compatibility */
+  def createTopic(topic: String): Unit = createTopic(topic, 1)
+
   /** Java-friendly function for sending messages to the Kafka broker */
   def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = {
     sendMessages(topic, 
Map(messageToFreq.asScala.mapValues(_.intValue()).toSeq: _*))

http://git-wip-us.apache.org/repos/asf/spark/blob/f19228ee/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
 
b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
index 4891e4f..fa6b0db 100644
--- 
a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
+++ 
b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
@@ -168,7 +168,7 @@ public class JavaDirectKafkaStreamSuite implements 
Serializable {
 
   private  String[] createTopicAndSendData(String topic) {
     String[] data = { topic + "-1", topic + "-2", topic + "-3"};
-    kafkaTestUtils.createTopic(topic);
+    kafkaTestUtils.createTopic(topic, 1);
     kafkaTestUtils.sendMessages(topic, data);
     return data;
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/f19228ee/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
 
b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
index afcc6cf..c41b629 100644
--- 
a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
+++ 
b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
@@ -149,7 +149,7 @@ public class JavaKafkaRDDSuite implements Serializable {
 
   private  String[] createTopicAndSendData(String topic) {
     String[] data = { topic + "-1", topic + "-2", topic + "-3"};
-    kafkaTestUtils.createTopic(topic);
+    kafkaTestUtils.createTopic(topic, 1);
     kafkaTestUtils.sendMessages(topic, data);
     return data;
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/f19228ee/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
 
b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
index 617c92a..868df64 100644
--- 
a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
+++ 
b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -76,7 +76,7 @@ public class JavaKafkaStreamSuite implements Serializable {
     sent.put("b", 3);
     sent.put("c", 10);
 
-    kafkaTestUtils.createTopic(topic);
+    kafkaTestUtils.createTopic(topic, 1);
     kafkaTestUtils.sendMessages(topic, sent);
 
     Map<String, String> kafkaParams = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/spark/blob/f19228ee/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
 
b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
index 8398178..b2c81d1 100644
--- 
a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
+++ 
b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -353,10 +353,38 @@ class DirectKafkaStreamSuite
     ssc.stop()
   }
 
+  test("maxMessagesPerPartition with backpressure disabled") {
+    val topic = "maxMessagesPerPartition"
+    val kafkaStream = getDirectKafkaStream(topic, None)
+
+    val input = Map(TopicAndPartition(topic, 0) -> 50L, 
TopicAndPartition(topic, 1) -> 50L)
+    assert(kafkaStream.maxMessagesPerPartition(input).get ==
+      Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic, 1) -> 
10L))
+  }
+
+  test("maxMessagesPerPartition with no lag") {
+    val topic = "maxMessagesPerPartition"
+    val rateController = Some(new ConstantRateController(0, new 
ConstantEstimator(100), 100))
+    val kafkaStream = getDirectKafkaStream(topic, rateController)
+
+    val input = Map(TopicAndPartition(topic, 0) -> 0L, 
TopicAndPartition(topic, 1) -> 0L)
+    assert(kafkaStream.maxMessagesPerPartition(input).isEmpty)
+  }
+
+  test("maxMessagesPerPartition respects max rate") {
+    val topic = "maxMessagesPerPartition"
+    val rateController = Some(new ConstantRateController(0, new 
ConstantEstimator(100), 1000))
+    val kafkaStream = getDirectKafkaStream(topic, rateController)
+
+    val input = Map(TopicAndPartition(topic, 0) -> 1000L, 
TopicAndPartition(topic, 1) -> 1000L)
+    assert(kafkaStream.maxMessagesPerPartition(input).get ==
+      Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic, 1) -> 
10L))
+  }
+
   test("using rate controller") {
     val topic = "backpressure"
-    val topicPartition = TopicAndPartition(topic, 0)
-    kafkaTestUtils.createTopic(topic)
+    val topicPartitions = Set(TopicAndPartition(topic, 0), 
TopicAndPartition(topic, 1))
+    kafkaTestUtils.createTopic(topic, 2)
     val kafkaParams = Map(
       "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
       "auto.offset.reset" -> "smallest"
@@ -364,8 +392,8 @@ class DirectKafkaStreamSuite
 
     val batchIntervalMilliseconds = 100
     val estimator = new ConstantEstimator(100)
-    val messageKeys = (1 to 200).map(_.toString)
-    val messages = messageKeys.map((_, 1)).toMap
+    val messages = Map("foo" -> 200)
+    kafkaTestUtils.sendMessages(topic, messages)
 
     val sparkConf = new SparkConf()
       // Safe, even with streaming, because we're using the direct API.
@@ -380,11 +408,11 @@ class DirectKafkaStreamSuite
     val kafkaStream = withClue("Error creating direct stream") {
       val kc = new KafkaCluster(kafkaParams)
       val messageHandler = (mmd: MessageAndMetadata[String, String]) => 
(mmd.key, mmd.message)
-      val m = kc.getEarliestLeaderOffsets(Set(topicPartition))
+      val m = kc.getEarliestLeaderOffsets(topicPartitions)
         .fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo => 
lo.offset))
 
       new DirectKafkaInputDStream[String, String, StringDecoder, 
StringDecoder, (String, String)](
-        ssc, kafkaParams, m, messageHandler) {
+          ssc, kafkaParams, m, messageHandler) {
         override protected[streaming] val rateController =
           Some(new DirectKafkaRateController(id, estimator))
       }
@@ -405,13 +433,12 @@ class DirectKafkaStreamSuite
     ssc.start()
 
     // Try different rate limits.
-    // Send data to Kafka and wait for arrays of data to appear matching the 
rate.
+    // Wait for arrays of data to appear matching the rate.
     Seq(100, 50, 20).foreach { rate =>
       collectedData.clear()       // Empty this buffer on each pass.
       estimator.updateRate(rate)  // Set a new rate.
       // Expect blocks of data equal to "rate", scaled by the interval length 
in secs.
       val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001)
-      kafkaTestUtils.sendMessages(topic, messages)
       eventually(timeout(5.seconds), 
interval(batchIntervalMilliseconds.milliseconds)) {
         // Assert that rate estimator values are used to determine 
maxMessagesPerPartition.
         // Funky "-" in message makes the complete assertion message read 
better.
@@ -430,6 +457,25 @@ class DirectKafkaStreamSuite
       rdd.asInstanceOf[KafkaRDD[K, V, _, _, (K, V)]].offsetRanges
     }.toSeq.sortBy { _._1 }
   }
+
+  private def getDirectKafkaStream(topic: String, mockRateController: 
Option[RateController]) = {
+    val batchIntervalMilliseconds = 100
+
+    val sparkConf = new SparkConf()
+      .setMaster("local[1]")
+      .setAppName(this.getClass.getSimpleName)
+      .set("spark.streaming.kafka.maxRatePerPartition", "100")
+
+    // Setup the streaming context
+    ssc = new StreamingContext(sparkConf, 
Milliseconds(batchIntervalMilliseconds))
+
+    val earliestOffsets = Map(TopicAndPartition(topic, 0) -> 0L, 
TopicAndPartition(topic, 1) -> 0L)
+    val messageHandler = (mmd: MessageAndMetadata[String, String]) => 
(mmd.key, mmd.message)
+    new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, 
(String, String)](
+      ssc, Map[String, String](), earliestOffsets, messageHandler) {
+      override protected[streaming] val rateController = mockRateController
+    }
+  }
 }
 
 object DirectKafkaStreamSuite {
@@ -468,3 +514,9 @@ private[streaming] class ConstantEstimator(@volatile 
private var rate: Long)
       processingDelay: Long,
       schedulingDelay: Long): Option[Double] = Some(rate)
 }
+
+private[streaming] class ConstantRateController(id: Int, estimator: 
RateEstimator, rate: Long)
+  extends RateController(id, estimator) {
+  override def publish(rate: Long): Unit = ()
+  override def getLatestRate(): Long = rate
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f19228ee/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 9ce37fc..983f716 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -288,6 +288,10 @@ object MimaExcludes {
         
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SQLConf$SQLConfEntry"),
         
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SQLConf$"),
         
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SQLConf$SQLConfEntry$")
+      ) ++ Seq(
+        // SPARK-12073: backpressure rate controller consumes events 
preferentially from lagging partitions
+        
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.kafka.KafkaTestUtils.createTopic"),
+        
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.kafka.DirectKafkaInputDStream.maxMessagesPerPartition")
       )
     case v if v.startsWith("1.6") =>
       Seq(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to