Repository: spark
Updated Branches:
  refs/heads/master f3bfb711c -> 18a761ef7


[SPARK-9968] [STREAMING] Reduced time spent within synchronized block to 
prevent lock starvation

When the rate limiter is actually limiting the rate at which data is inserted 
into the buffer, the synchronized block of BlockGenerator.addData stays blocked 
for long time. This causes the thread switching the buffer and generating 
blocks (synchronized with addData) to starve and not generate blocks for 
seconds. The correct solution is to not block on the rate limiter within the 
synchronized block for adding data to the buffer.

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #8204 from tdas/SPARK-9968 and squashes the following commits:

8cbcc1b [Tathagata Das] Removed unused val
a73b645 [Tathagata Das] Reduced time spent within synchronized block


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/18a761ef
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/18a761ef
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/18a761ef

Branch: refs/heads/master
Commit: 18a761ef7a01a4dfa1dd91abe78cd68f2f8fdb67
Parents: f3bfb71
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Fri Aug 14 15:54:14 2015 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Fri Aug 14 15:54:14 2015 -0700

----------------------------------------------------------------------
 .../streaming/receiver/BlockGenerator.scala     | 40 ++++++++++++++++----
 1 file changed, 32 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/18a761ef/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index 794dece..300e820 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -155,10 +155,17 @@ private[streaming] class BlockGenerator(
   /**
    * Push a single data item into the buffer.
    */
-  def addData(data: Any): Unit = synchronized {
+  def addData(data: Any): Unit = {
     if (state == Active) {
       waitToPush()
-      currentBuffer += data
+      synchronized {
+        if (state == Active) {
+          currentBuffer += data
+        } else {
+          throw new SparkException(
+            "Cannot add data as BlockGenerator has not been started or has 
been stopped")
+        }
+      }
     } else {
       throw new SparkException(
         "Cannot add data as BlockGenerator has not been started or has been 
stopped")
@@ -169,11 +176,18 @@ private[streaming] class BlockGenerator(
    * Push a single data item into the buffer. After buffering the data, the
    * `BlockGeneratorListener.onAddData` callback will be called.
    */
-  def addDataWithCallback(data: Any, metadata: Any): Unit = synchronized {
+  def addDataWithCallback(data: Any, metadata: Any): Unit = {
     if (state == Active) {
       waitToPush()
-      currentBuffer += data
-      listener.onAddData(data, metadata)
+      synchronized {
+        if (state == Active) {
+          currentBuffer += data
+          listener.onAddData(data, metadata)
+        } else {
+          throw new SparkException(
+            "Cannot add data as BlockGenerator has not been started or has 
been stopped")
+        }
+      }
     } else {
       throw new SparkException(
         "Cannot add data as BlockGenerator has not been started or has been 
stopped")
@@ -185,13 +199,23 @@ private[streaming] class BlockGenerator(
    * `BlockGeneratorListener.onAddData` callback will be called. Note that all 
the data items
    * are atomically added to the buffer, and are hence guaranteed to be 
present in a single block.
    */
-  def addMultipleDataWithCallback(dataIterator: Iterator[Any], metadata: Any): 
Unit = synchronized {
+  def addMultipleDataWithCallback(dataIterator: Iterator[Any], metadata: Any): 
Unit = {
     if (state == Active) {
+      // Unroll iterator into a temp buffer, and wait for pushing in the 
process
+      val tempBuffer = new ArrayBuffer[Any]
       dataIterator.foreach { data =>
         waitToPush()
-        currentBuffer += data
+        tempBuffer += data
+      }
+      synchronized {
+        if (state == Active) {
+          currentBuffer ++= tempBuffer
+          listener.onAddData(tempBuffer, metadata)
+        } else {
+          throw new SparkException(
+            "Cannot add data as BlockGenerator has not been started or has 
been stopped")
+        }
       }
-      listener.onAddData(dataIterator, metadata)
     } else {
       throw new SparkException(
         "Cannot add data as BlockGenerator has not been started or has been 
stopped")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to