Repository: spark
Updated Branches:
  refs/heads/master b4b35f133 -> 1aeae05bb


[SPARK-10072] [STREAMING] BlockGenerator can deadlock when the queue of 
generate blocks fills up to capacity

Generated blocks are inserted into an ArrayBlockingQueue, and another thread 
pulls stuff from the ArrayBlockingQueue and pushes it into BlockManager. Now if 
that queue fills up to capacity (default is 10 blocks), then the inserting into 
queue (done in the function updateCurrentBuffer) get blocked inside a 
synchronized block. However, the thread that is pulling blocks from the queue 
uses the same lock to check the current (active or stopped) while pulling from 
the queue. Since the block generating threads is blocked (as the queue is full) 
on the lock, this thread that is supposed to drain the queue gets blocked. 
Ergo, deadlock.

Solution: Moved blocking call to ArrayBlockingQueue outside the synchronized to 
prevent deadlock.

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #8257 from tdas/SPARK-10072.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1aeae05b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1aeae05b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1aeae05b

Branch: refs/heads/master
Commit: 1aeae05bb20f01ab7ccaa62fe905a63e020074b5
Parents: b4b35f1
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Tue Aug 18 19:26:38 2015 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Tue Aug 18 19:26:38 2015 -0700

----------------------------------------------------------------------
 .../streaming/receiver/BlockGenerator.scala     | 29 +++++++++++++-------
 1 file changed, 19 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1aeae05b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index 300e820..421d60a 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -227,16 +227,21 @@ private[streaming] class BlockGenerator(
   def isStopped(): Boolean = state == StoppedAll
 
   /** Change the buffer to which single records are added to. */
-  private def updateCurrentBuffer(time: Long): Unit = synchronized {
+  private def updateCurrentBuffer(time: Long): Unit = {
     try {
-      val newBlockBuffer = currentBuffer
-      currentBuffer = new ArrayBuffer[Any]
-      if (newBlockBuffer.size > 0) {
-        val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
-        val newBlock = new Block(blockId, newBlockBuffer)
-        listener.onGenerateBlock(blockId)
+      var newBlock: Block = null
+      synchronized {
+        if (currentBuffer.nonEmpty) {
+          val newBlockBuffer = currentBuffer
+          currentBuffer = new ArrayBuffer[Any]
+          val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
+          listener.onGenerateBlock(blockId)
+          newBlock = new Block(blockId, newBlockBuffer)
+        }
+      }
+
+      if (newBlock != null) {
         blocksForPushing.put(newBlock)  // put is blocking when queue is full
-        logDebug("Last element in " + blockId + " is " + newBlockBuffer.last)
       }
     } catch {
       case ie: InterruptedException =>
@@ -250,9 +255,13 @@ private[streaming] class BlockGenerator(
   private def keepPushingBlocks() {
     logInfo("Started block pushing thread")
 
-    def isGeneratingBlocks = synchronized { state == Active || state == 
StoppedAddingData }
+    def areBlocksBeingGenerated: Boolean = synchronized {
+      state != StoppedGeneratingBlocks
+    }
+
     try {
-      while (isGeneratingBlocks) {
+      // While blocks are being generated, keep polling for to-be-pushed 
blocks and push them.
+      while (areBlocksBeingGenerated) {
         Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match {
           case Some(block) => pushBlock(block)
           case None =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to