Repository: spark Updated Branches: refs/heads/master b4b35f133 -> 1aeae05bb
[SPARK-10072] [STREAMING] BlockGenerator can deadlock when the queue of generate blocks fills up to capacity Generated blocks are inserted into an ArrayBlockingQueue, and another thread pulls stuff from the ArrayBlockingQueue and pushes it into BlockManager. Now if that queue fills up to capacity (default is 10 blocks), then the inserting into queue (done in the function updateCurrentBuffer) get blocked inside a synchronized block. However, the thread that is pulling blocks from the queue uses the same lock to check the current (active or stopped) while pulling from the queue. Since the block generating threads is blocked (as the queue is full) on the lock, this thread that is supposed to drain the queue gets blocked. Ergo, deadlock. Solution: Moved blocking call to ArrayBlockingQueue outside the synchronized to prevent deadlock. Author: Tathagata Das <tathagata.das1...@gmail.com> Closes #8257 from tdas/SPARK-10072. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1aeae05b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1aeae05b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1aeae05b Branch: refs/heads/master Commit: 1aeae05bb20f01ab7ccaa62fe905a63e020074b5 Parents: b4b35f1 Author: Tathagata Das <tathagata.das1...@gmail.com> Authored: Tue Aug 18 19:26:38 2015 -0700 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Tue Aug 18 19:26:38 2015 -0700 ---------------------------------------------------------------------- .../streaming/receiver/BlockGenerator.scala | 29 +++++++++++++------- 1 file changed, 19 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/1aeae05b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala index 300e820..421d60a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala @@ -227,16 +227,21 @@ private[streaming] class BlockGenerator( def isStopped(): Boolean = state == StoppedAll /** Change the buffer to which single records are added to. */ - private def updateCurrentBuffer(time: Long): Unit = synchronized { + private def updateCurrentBuffer(time: Long): Unit = { try { - val newBlockBuffer = currentBuffer - currentBuffer = new ArrayBuffer[Any] - if (newBlockBuffer.size > 0) { - val blockId = StreamBlockId(receiverId, time - blockIntervalMs) - val newBlock = new Block(blockId, newBlockBuffer) - listener.onGenerateBlock(blockId) + var newBlock: Block = null + synchronized { + if (currentBuffer.nonEmpty) { + val newBlockBuffer = currentBuffer + currentBuffer = new ArrayBuffer[Any] + val blockId = StreamBlockId(receiverId, time - blockIntervalMs) + listener.onGenerateBlock(blockId) + newBlock = new Block(blockId, newBlockBuffer) + } + } + + if (newBlock != null) { blocksForPushing.put(newBlock) // put is blocking when queue is full - logDebug("Last element in " + blockId + " is " + newBlockBuffer.last) } } catch { case ie: InterruptedException => @@ -250,9 +255,13 @@ private[streaming] class BlockGenerator( private def keepPushingBlocks() { logInfo("Started block pushing thread") - def isGeneratingBlocks = synchronized { state == Active || state == StoppedAddingData } + def areBlocksBeingGenerated: Boolean = synchronized { + state != StoppedGeneratingBlocks + } + try { - while (isGeneratingBlocks) { + // While blocks are being generated, keep polling for to-be-pushed blocks and push them. + while (areBlocksBeingGenerated) { Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match { case Some(block) => pushBlock(block) case None => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org