Repository: spark
Updated Branches:
  refs/heads/branch-1.3 8549ff4f0 -> 948f2f635


[SPARK-6985][streaming] Receiver maxRate over 1000 causes a StackOverflowError

A simple truncation in integer division (on rates over 1000 messages / second) 
causes the existing implementation to sleep for 0 milliseconds, then call 
itself recursively; this causes what is essentially an infinite recursion, 
since the base case of the calculated amount of time having elapsed can't be 
reached before available stack space is exhausted. A fix to this truncation 
error is included in this patch.

However, even with the defect patched, the accuracy of the existing 
implementation is abysmal (the error bounds of the original test were 
effectively [-30%, +10%], although this fact was obscured by hard-coded error 
margins); as such, when the error bounds were tightened down to [-5%, +5%], the 
existing implementation failed to meet the new, tightened, requirements. 
Therefore, an industry-vetted solution (from Guava) was used to get the adapted 
tests to pass.

Author: David McGuire <david.mcgui...@nike.com>

Closes #5559 from dmcguire81/master and squashes the following commits:

d29d2e0 [David McGuire] Back out to +/-5% error margins, for flexibility in 
timing
8be6934 [David McGuire] Fix spacing per code review
90e98b9 [David McGuire] Address scalastyle errors
29011bd [David McGuire] Further ratchet down the error margins
b33b796 [David McGuire] Eliminate dependency on even distribution by 
BlockGenerator
8f2934b [David McGuire] Remove arbitrary thread timing / cooperation code
70ee310 [David McGuire] Use Thread.yield(), since Thread.sleep(0) is 
system-dependent
82ee46d [David McGuire] Replace guard clause with nested conditional
2794717 [David McGuire] Replace the RateLimiter with the Guava implementation
38f3ca8 [David McGuire] Ratchet down the error rate to +/- 5%; tests fail
24b1bc0 [David McGuire] Fix truncation in integer division causing infinite 
recursion
d6e1079 [David McGuire] Stack overflow error in RateLimiter on rates over 1000/s

(cherry picked from commit 5fea3e5c36450658d8b767dd3c06dac2251a0e0c)
Signed-off-by: Sean Owen <so...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/948f2f63
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/948f2f63
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/948f2f63

Branch: refs/heads/branch-1.3
Commit: 948f2f635d17cda0b8e1ef48c467ebb807ea1a3d
Parents: 8549ff4
Author: David McGuire <david.mcgui...@nike.com>
Authored: Tue Apr 21 07:21:10 2015 -0400
Committer: Sean Owen <so...@cloudera.com>
Committed: Tue Apr 21 07:23:00 2015 -0400

----------------------------------------------------------------------
 .../spark/streaming/receiver/RateLimiter.scala  | 33 +++-----------------
 .../apache/spark/streaming/ReceiverSuite.scala  | 29 ++++++++++-------
 2 files changed, 21 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/948f2f63/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
index e4f6ba6..97db9de 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.streaming.receiver
 
 import org.apache.spark.{Logging, SparkConf}
-import java.util.concurrent.TimeUnit._
+import com.google.common.util.concurrent.{RateLimiter=>GuavaRateLimiter}
 
 /** Provides waitToPush() method to limit the rate at which receivers consume 
data.
   *
@@ -33,37 +33,12 @@ import java.util.concurrent.TimeUnit._
   */
 private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
 
-  private var lastSyncTime = System.nanoTime
-  private var messagesWrittenSinceSync = 0L
   private val desiredRate = conf.getInt("spark.streaming.receiver.maxRate", 0)
-  private val SYNC_INTERVAL = NANOSECONDS.convert(10, SECONDS)
+  private lazy val rateLimiter = GuavaRateLimiter.create(desiredRate)
 
   def waitToPush() {
-    if( desiredRate <= 0 ) {
-      return
-    }
-    val now = System.nanoTime
-    val elapsedNanosecs = math.max(now - lastSyncTime, 1)
-    val rate = messagesWrittenSinceSync.toDouble * 1000000000 / elapsedNanosecs
-    if (rate < desiredRate) {
-      // It's okay to write; just update some variables and return
-      messagesWrittenSinceSync += 1
-      if (now > lastSyncTime + SYNC_INTERVAL) {
-        // Sync interval has passed; let's resync
-        lastSyncTime = now
-        messagesWrittenSinceSync = 1
-      }
-    } else {
-      // Calculate how much time we should sleep to bring ourselves to the 
desired rate.
-      val targetTimeInMillis = messagesWrittenSinceSync * 1000 / desiredRate
-      val elapsedTimeInMillis = elapsedNanosecs / 1000000
-      val sleepTimeInMillis = targetTimeInMillis - elapsedTimeInMillis
-      if (sleepTimeInMillis > 0) {
-        logTrace("Natural rate is " + rate + " per second but desired rate is 
" +
-          desiredRate + ", sleeping for " + sleepTimeInMillis + " ms to 
compensate.")
-        Thread.sleep(sleepTimeInMillis)
-      }
-      waitToPush()
+    if (desiredRate > 0) {
+      rateLimiter.acquire()
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/948f2f63/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
index e8c34a9..5b37de1 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
@@ -158,7 +158,7 @@ class ReceiverSuite extends TestSuiteBase with Timeouts 
with Serializable {
   test("block generator throttling") {
     val blockGeneratorListener = new FakeBlockGeneratorListener
     val blockInterval = 100
-    val maxRate = 100
+    val maxRate = 1001
     val conf = new SparkConf().set("spark.streaming.blockInterval", 
blockInterval.toString).
       set("spark.streaming.receiver.maxRate", maxRate.toString)
     val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf)
@@ -176,7 +176,6 @@ class ReceiverSuite extends TestSuiteBase with Timeouts 
with Serializable {
       blockGenerator.addData(count)
       generatedData += count
       count += 1
-      Thread.sleep(1)
     }
     blockGenerator.stop()
 
@@ -185,25 +184,31 @@ class ReceiverSuite extends TestSuiteBase with Timeouts 
with Serializable {
     assert(blockGeneratorListener.arrayBuffers.size > 0, "No blocks received")
     assert(recordedData.toSet === generatedData.toSet, "Received data not 
same")
 
-    // recordedData size should be close to the expected rate
-    val minExpectedMessages = expectedMessages - 3
-    val maxExpectedMessages = expectedMessages + 1
+    // recordedData size should be close to the expected rate; use an error 
margin proportional to
+    // the value, so that rate changes don't cause a brittle test
+    val minExpectedMessages = expectedMessages - 0.05 * expectedMessages
+    val maxExpectedMessages = expectedMessages + 0.05 * expectedMessages
     val numMessages = recordedData.size
     assert(
       numMessages >= minExpectedMessages && numMessages <= maxExpectedMessages,
       s"#records received = $numMessages, not between $minExpectedMessages and 
$maxExpectedMessages"
     )
 
-    val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 3
-    val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 1
+    // XXX Checking every block would require an even distribution of messages 
across blocks,
+    // which throttling code does not control. Therefore, test against the 
average.
+    val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 0.05 * 
expectedMessagesPerBlock
+    val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 0.05 * 
expectedMessagesPerBlock
     val receivedBlockSizes = recordedBlocks.map { _.size }.mkString(",")
+
+    // the first and last block may be incomplete, so we slice them out
+    val validBlocks = recordedBlocks.drop(1).dropRight(1)
+    val averageBlockSize = validBlocks.map(block => block.size).sum / 
validBlocks.size
+
     assert(
-      // the first and last block may be incomplete, so we slice them out
-      recordedBlocks.drop(1).dropRight(1).forall { block =>
-        block.size >= minExpectedMessagesPerBlock && block.size <= 
maxExpectedMessagesPerBlock
-      },
+      averageBlockSize >= minExpectedMessagesPerBlock &&
+        averageBlockSize <= maxExpectedMessagesPerBlock,
       s"# records in received blocks = [$receivedBlockSizes], not between " +
-        s"$minExpectedMessagesPerBlock and $maxExpectedMessagesPerBlock"
+        s"$minExpectedMessagesPerBlock and $maxExpectedMessagesPerBlock, on 
average"
     )
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to