Repository: spark
Updated Branches:
  refs/heads/branch-1.5 5bbb2d327 -> 612b4609b


[SPARK-9966] [STREAMING] Handle couple of corner cases in PIDRateEstimator

1. The rate estimator should not estimate any rate when there are no records in 
the batch, as there is no data to estimate the rate. In the current state, it 
estimates and set the rate to zero. That is incorrect.

2. The rate estimator should not never set the rate to zero under any 
circumstances. Otherwise the system will stop receiving data, and stop 
generating useful estimates (see reason 1). So the fix is to define a 
parameters that sets a lower bound on the estimated rate, so that the system 
always receives some data.

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #8199 from tdas/SPARK-9966 and squashes the following commits:

829f793 [Tathagata Das] Fixed unit test and added comments
3a994db [Tathagata Das] Added min rate and updated tests in PIDRateEstimator

(cherry picked from commit f3bfb711c1742d0915e43bda8230b4d1d22b4190)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/612b4609
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/612b4609
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/612b4609

Branch: refs/heads/branch-1.5
Commit: 612b4609bdd38763725ae07d77c2176aa6756e64
Parents: 5bbb2d3
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Fri Aug 14 15:10:01 2015 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Fri Aug 14 15:10:19 2015 -0700

----------------------------------------------------------------------
 .../scheduler/rate/PIDRateEstimator.scala       | 46 +++++++++---
 .../scheduler/rate/RateEstimator.scala          |  4 +-
 .../scheduler/rate/PIDRateEstimatorSuite.scala  | 79 +++++++++++++-------
 3 files changed, 87 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/612b4609/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala
index 6ae56a6..84a3ca9 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.streaming.scheduler.rate
 
+import org.apache.spark.Logging
+
 /**
  * Implements a proportional-integral-derivative (PID) controller which acts on
  * the speed of ingestion of elements into Spark Streaming. A PID controller 
works
@@ -26,7 +28,7 @@ package org.apache.spark.streaming.scheduler.rate
  *
  * @see https://en.wikipedia.org/wiki/PID_controller
  *
- * @param batchDurationMillis the batch duration, in milliseconds
+ * @param batchIntervalMillis the batch duration, in milliseconds
  * @param proportional how much the correction should depend on the current
  *        error. This term usually provides the bulk of correction and should 
be positive or zero.
  *        A value too large would make the controller overshoot the setpoint, 
while a small value
@@ -39,13 +41,17 @@ package org.apache.spark.streaming.scheduler.rate
  *        of future errors, based on current rate of change. This value should 
be positive or 0.
  *        This term is not used very often, as it impacts stability of the 
system. The default
  *        value is 0.
+ * @param minRate what is the minimum rate that can be estimated.
+ *        This must be greater than zero, so that the system always receives 
some data for rate
+ *        estimation to work.
  */
 private[streaming] class PIDRateEstimator(
     batchIntervalMillis: Long,
-    proportional: Double = 1D,
-    integral: Double = .2D,
-    derivative: Double = 0D)
-  extends RateEstimator {
+    proportional: Double,
+    integral: Double,
+    derivative: Double,
+    minRate: Double
+  ) extends RateEstimator with Logging {
 
   private var firstRun: Boolean = true
   private var latestTime: Long = -1L
@@ -64,16 +70,23 @@ private[streaming] class PIDRateEstimator(
   require(
     derivative >= 0,
     s"Derivative term $derivative in PIDRateEstimator should be >= 0.")
+  require(
+    minRate > 0,
+    s"Minimum rate in PIDRateEstimator should be > 0")
 
+  logInfo(s"Created PIDRateEstimator with proportional = $proportional, 
integral = $integral, " +
+    s"derivative = $derivative, min rate = $minRate")
 
-  def compute(time: Long, // in milliseconds
+  def compute(
+      time: Long, // in milliseconds
       numElements: Long,
       processingDelay: Long, // in milliseconds
       schedulingDelay: Long // in milliseconds
     ): Option[Double] = {
-
+    logTrace(s"\ntime = $time, # records = $numElements, " +
+      s"processing time = $processingDelay, scheduling delay = 
$schedulingDelay")
     this.synchronized {
-      if (time > latestTime && processingDelay > 0 && batchIntervalMillis > 0) 
{
+      if (time > latestTime && numElements > 0 && processingDelay > 0) {
 
         // in seconds, should be close to batchDuration
         val delaySinceUpdate = (time - latestTime).toDouble / 1000
@@ -104,21 +117,30 @@ private[streaming] class PIDRateEstimator(
 
         val newRate = (latestRate - proportional * error -
                                     integral * historicalError -
-                                    derivative * dError).max(0.0)
+                                    derivative * dError).max(minRate)
+        logTrace(s"""
+            | latestRate = $latestRate, error = $error
+            | latestError = $latestError, historicalError = $historicalError
+            | delaySinceUpdate = $delaySinceUpdate, dError = $dError
+            """.stripMargin)
+
         latestTime = time
         if (firstRun) {
           latestRate = processingRate
           latestError = 0D
           firstRun = false
-
+          logTrace("First run, rate estimation skipped")
           None
         } else {
           latestRate = newRate
           latestError = error
-
+          logTrace(s"New rate = $newRate")
           Some(newRate)
         }
-      } else None
+      } else {
+        logTrace("Rate estimation skipped")
+        None
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/612b4609/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala
index 17ccebc..d7210f6 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.streaming.scheduler.rate
 
 import org.apache.spark.SparkConf
-import org.apache.spark.SparkException
 import org.apache.spark.streaming.Duration
 
 /**
@@ -61,7 +60,8 @@ object RateEstimator {
         val proportional = 
conf.getDouble("spark.streaming.backpressure.pid.proportional", 1.0)
         val integral = 
conf.getDouble("spark.streaming.backpressure.pid.integral", 0.2)
         val derived = 
conf.getDouble("spark.streaming.backpressure.pid.derived", 0.0)
-        new PIDRateEstimator(batchInterval.milliseconds, proportional, 
integral, derived)
+        val minRate = 
conf.getDouble("spark.streaming.backpressure.pid.minRate", 100)
+        new PIDRateEstimator(batchInterval.milliseconds, proportional, 
integral, derived, minRate)
 
       case estimator =>
         throw new IllegalArgumentException(s"Unkown rate estimator: 
$estimator")

http://git-wip-us.apache.org/repos/asf/spark/blob/612b4609/streaming/src/test/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimatorSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimatorSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimatorSuite.scala
index 97c32d8..a1af95be 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimatorSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimatorSuite.scala
@@ -36,72 +36,89 @@ class PIDRateEstimatorSuite extends SparkFunSuite with 
Matchers {
 
   test("estimator checks ranges") {
     intercept[IllegalArgumentException] {
-      new PIDRateEstimator(0, 1, 2, 3)
+      new PIDRateEstimator(batchIntervalMillis = 0, 1, 2, 3, 10)
     }
     intercept[IllegalArgumentException] {
-      new PIDRateEstimator(100, -1, 2, 3)
+      new PIDRateEstimator(100, proportional = -1, 2, 3, 10)
     }
     intercept[IllegalArgumentException] {
-      new PIDRateEstimator(100, 0, -1, 3)
+      new PIDRateEstimator(100, 0, integral = -1, 3, 10)
     }
     intercept[IllegalArgumentException] {
-      new PIDRateEstimator(100, 0, 0, -1)
+      new PIDRateEstimator(100, 0, 0, derivative = -1, 10)
+    }
+    intercept[IllegalArgumentException] {
+      new PIDRateEstimator(100, 0, 0, 0, minRate = 0)
+    }
+    intercept[IllegalArgumentException] {
+      new PIDRateEstimator(100, 0, 0, 0, minRate = -10)
     }
   }
 
-  private def createDefaultEstimator: PIDRateEstimator = {
-    new PIDRateEstimator(20, 1D, 0D, 0D)
-  }
-
-  test("first bound is None") {
-    val p = createDefaultEstimator
+  test("first estimate is None") {
+    val p = createDefaultEstimator()
     p.compute(0, 10, 10, 0) should equal(None)
   }
 
-  test("second bound is rate") {
-    val p = createDefaultEstimator
+  test("second estimate is not None") {
+    val p = createDefaultEstimator()
     p.compute(0, 10, 10, 0)
     // 1000 elements / s
     p.compute(10, 10, 10, 0) should equal(Some(1000))
   }
 
-  test("works even with no time between updates") {
-    val p = createDefaultEstimator
+  test("no estimate when no time difference between successive calls") {
+    val p = createDefaultEstimator()
+    p.compute(0, 10, 10, 0)
+    p.compute(time = 10, 10, 10, 0) shouldNot equal(None)
+    p.compute(time = 10, 10, 10, 0) should equal(None)
+  }
+
+  test("no estimate when no records in previous batch") {
+    val p = createDefaultEstimator()
     p.compute(0, 10, 10, 0)
-    p.compute(10, 10, 10, 0)
-    p.compute(10, 10, 10, 0) should equal(None)
+    p.compute(10, numElements = 0, 10, 0) should equal(None)
+    p.compute(20, numElements = -10, 10, 0) should equal(None)
   }
 
-  test("bound is never negative") {
-    val p = new PIDRateEstimator(20, 1D, 1D, 0D)
+  test("no estimate when there is no processing delay") {
+    val p = createDefaultEstimator()
+    p.compute(0, 10, 10, 0)
+    p.compute(10, 10, processingDelay = 0, 0) should equal(None)
+    p.compute(20, 10, processingDelay = -10, 0) should equal(None)
+  }
+
+  test("estimate is never less than min rate") {
+    val minRate = 5D
+    val p = new PIDRateEstimator(20, 1D, 1D, 0D, minRate)
     // prepare a series of batch updates, one every 20ms, 0 processed 
elements, 2ms of processing
     // this might point the estimator to try and decrease the bound, but we 
test it never
-    // goes below zero, which would be nonsensical.
+    // goes below the min rate, which would be nonsensical.
     val times = List.tabulate(50)(x => x * 20) // every 20ms
-    val elements = List.fill(50)(0) // no processing
+    val elements = List.fill(50)(1) // no processing
     val proc = List.fill(50)(20) // 20ms of processing
     val sched = List.fill(50)(100) // strictly positive accumulation
     val res = for (i <- List.range(0, 50)) yield p.compute(times(i), 
elements(i), proc(i), sched(i))
     res.head should equal(None)
-    res.tail should equal(List.fill(49)(Some(0D)))
+    res.tail should equal(List.fill(49)(Some(minRate)))
   }
 
   test("with no accumulated or positive error, |I| > 0, follow the processing 
speed") {
-    val p = new PIDRateEstimator(20, 1D, 1D, 0D)
+    val p = new PIDRateEstimator(20, 1D, 1D, 0D, 10)
     // prepare a series of batch updates, one every 20ms with an increasing 
number of processed
     // elements in each batch, but constant processing time, and no 
accumulated error. Even though
     // the integral part is non-zero, the estimated rate should follow only 
the proportional term
     val times = List.tabulate(50)(x => x * 20) // every 20ms
-    val elements = List.tabulate(50)(x => x * 20) // increasing
+    val elements = List.tabulate(50)(x => (x + 1) * 20) // increasing
     val proc = List.fill(50)(20) // 20ms of processing
     val sched = List.fill(50)(0)
     val res = for (i <- List.range(0, 50)) yield p.compute(times(i), 
elements(i), proc(i), sched(i))
     res.head should equal(None)
-    res.tail should equal(List.tabulate(50)(x => Some(x * 1000D)).tail)
+    res.tail should equal(List.tabulate(50)(x => Some((x + 1) * 1000D)).tail)
   }
 
   test("with no accumulated but some positive error, |I| > 0, follow the 
processing speed") {
-    val p = new PIDRateEstimator(20, 1D, 1D, 0D)
+    val p = new PIDRateEstimator(20, 1D, 1D, 0D, 10)
     // prepare a series of batch updates, one every 20ms with an decreasing 
number of processed
     // elements in each batch, but constant processing time, and no 
accumulated error. Even though
     // the integral part is non-zero, the estimated rate should follow only 
the proportional term,
@@ -116,13 +133,14 @@ class PIDRateEstimatorSuite extends SparkFunSuite with 
Matchers {
   }
 
   test("with some accumulated and some positive error, |I| > 0, stay below the 
processing speed") {
-    val p = new PIDRateEstimator(20, 1D, .01D, 0D)
+    val minRate = 10D
+    val p = new PIDRateEstimator(20, 1D, .01D, 0D, minRate)
     val times = List.tabulate(50)(x => x * 20) // every 20ms
     val rng = new Random()
-    val elements = List.tabulate(50)(x => rng.nextInt(1000))
+    val elements = List.tabulate(50)(x => rng.nextInt(1000) + 1000)
     val procDelayMs = 20
     val proc = List.fill(50)(procDelayMs) // 20ms of processing
-    val sched = List.tabulate(50)(x => rng.nextInt(19)) // random wait
+    val sched = List.tabulate(50)(x => rng.nextInt(19) + 1) // random wait
     val speeds = elements map ((x) => x.toDouble / procDelayMs * 1000)
 
     val res = for (i <- List.range(0, 50)) yield p.compute(times(i), 
elements(i), proc(i), sched(i))
@@ -131,7 +149,12 @@ class PIDRateEstimatorSuite extends SparkFunSuite with 
Matchers {
       res(n) should not be None
       if (res(n).get > 0 && sched(n) > 0) {
         res(n).get should be < speeds(n)
+        res(n).get should be >= minRate
       }
     }
   }
+
+  private def createDefaultEstimator(): PIDRateEstimator = {
+    new PIDRateEstimator(20, 1D, 0D, 0D, 10)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to