Repository: spark Updated Branches: refs/heads/master f3bfb711c -> 18a761ef7
[SPARK-9968] [STREAMING] Reduced time spent within synchronized block to prevent lock starvation When the rate limiter is actually limiting the rate at which data is inserted into the buffer, the synchronized block of BlockGenerator.addData stays blocked for long time. This causes the thread switching the buffer and generating blocks (synchronized with addData) to starve and not generate blocks for seconds. The correct solution is to not block on the rate limiter within the synchronized block for adding data to the buffer. Author: Tathagata Das <tathagata.das1...@gmail.com> Closes #8204 from tdas/SPARK-9968 and squashes the following commits: 8cbcc1b [Tathagata Das] Removed unused val a73b645 [Tathagata Das] Reduced time spent within synchronized block Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/18a761ef Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/18a761ef Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/18a761ef Branch: refs/heads/master Commit: 18a761ef7a01a4dfa1dd91abe78cd68f2f8fdb67 Parents: f3bfb71 Author: Tathagata Das <tathagata.das1...@gmail.com> Authored: Fri Aug 14 15:54:14 2015 -0700 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Fri Aug 14 15:54:14 2015 -0700 ---------------------------------------------------------------------- .../streaming/receiver/BlockGenerator.scala | 40 ++++++++++++++++---- 1 file changed, 32 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/18a761ef/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala index 794dece..300e820 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala @@ -155,10 +155,17 @@ private[streaming] class BlockGenerator( /** * Push a single data item into the buffer. */ - def addData(data: Any): Unit = synchronized { + def addData(data: Any): Unit = { if (state == Active) { waitToPush() - currentBuffer += data + synchronized { + if (state == Active) { + currentBuffer += data + } else { + throw new SparkException( + "Cannot add data as BlockGenerator has not been started or has been stopped") + } + } } else { throw new SparkException( "Cannot add data as BlockGenerator has not been started or has been stopped") @@ -169,11 +176,18 @@ private[streaming] class BlockGenerator( * Push a single data item into the buffer. After buffering the data, the * `BlockGeneratorListener.onAddData` callback will be called. */ - def addDataWithCallback(data: Any, metadata: Any): Unit = synchronized { + def addDataWithCallback(data: Any, metadata: Any): Unit = { if (state == Active) { waitToPush() - currentBuffer += data - listener.onAddData(data, metadata) + synchronized { + if (state == Active) { + currentBuffer += data + listener.onAddData(data, metadata) + } else { + throw new SparkException( + "Cannot add data as BlockGenerator has not been started or has been stopped") + } + } } else { throw new SparkException( "Cannot add data as BlockGenerator has not been started or has been stopped") @@ -185,13 +199,23 @@ private[streaming] class BlockGenerator( * `BlockGeneratorListener.onAddData` callback will be called. Note that all the data items * are atomically added to the buffer, and are hence guaranteed to be present in a single block. */ - def addMultipleDataWithCallback(dataIterator: Iterator[Any], metadata: Any): Unit = synchronized { + def addMultipleDataWithCallback(dataIterator: Iterator[Any], metadata: Any): Unit = { if (state == Active) { + // Unroll iterator into a temp buffer, and wait for pushing in the process + val tempBuffer = new ArrayBuffer[Any] dataIterator.foreach { data => waitToPush() - currentBuffer += data + tempBuffer += data + } + synchronized { + if (state == Active) { + currentBuffer ++= tempBuffer + listener.onAddData(tempBuffer, metadata) + } else { + throw new SparkException( + "Cannot add data as BlockGenerator has not been started or has been stopped") + } } - listener.onAddData(dataIterator, metadata) } else { throw new SparkException( "Cannot add data as BlockGenerator has not been started or has been stopped") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org