Repository: spark Updated Branches: refs/heads/branch-2.0 733cbaa3c -> dcce0aaaf
[SPARK-15077][SQL] Use a fair lock to avoid thread starvation in StreamExecution ## What changes were proposed in this pull request? Right now `StreamExecution.awaitBatchLock` uses an unfair lock. `StreamExecution.awaitOffset` may run too long and fail some test because `StreamExecution.constructNextBatch` keeps getting the lock. See: https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.4/865/testReport/junit/org.apache.spark.sql.streaming/FileStreamSourceStressTestSuite/file_source_stress_test/ This PR uses a fair ReentrantLock to resolve the thread starvation issue. ## How was this patch tested? Modified `FileStreamSourceStressTestSuite.test("file source stress test")` to run the test codes 100 times locally. It always fails because of timeout without this patch. Author: Shixiong Zhu <shixi...@databricks.com> Closes #12852 from zsxwing/SPARK-15077. (cherry picked from commit 4e3685ae5e5826e63bfcd7c3729e3b9cbab484b5) Signed-off-by: Michael Armbrust <mich...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dcce0aaa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dcce0aaa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dcce0aaa Branch: refs/heads/branch-2.0 Commit: dcce0aaafedc496e3e69c02c51ad31f01de05287 Parents: 733cbaa Author: Shixiong Zhu <shixi...@databricks.com> Authored: Mon May 2 18:27:49 2016 -0700 Committer: Michael Armbrust <mich...@databricks.com> Committed: Mon May 2 18:28:01 2016 -0700 ---------------------------------------------------------------------- .../execution/streaming/StreamExecution.scala | 79 +++++++++++++------- 1 file changed, 52 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/dcce0aaa/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index ce68c09..3108346 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.locks.ReentrantLock import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal @@ -53,8 +54,12 @@ class StreamExecution( val trigger: Trigger) extends ContinuousQuery with Logging { - /** An monitor used to wait/notify when batches complete. */ - private val awaitBatchLock = new Object + /** + * A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation. + */ + private val awaitBatchLock = new ReentrantLock(true) + private val awaitBatchLockCondition = awaitBatchLock.newCondition() + private val startLatch = new CountDownLatch(1) private val terminationLatch = new CountDownLatch(1) @@ -242,17 +247,22 @@ class StreamExecution( // method. See SPARK-14131. // // Check to see what new data is available. - val hasNewData = awaitBatchLock.synchronized { - val newData = microBatchThread.runUninterruptibly { - uniqueSources.flatMap(s => s.getOffset.map(o => s -> o)) - } - availableOffsets ++= newData + val hasNewData = { + awaitBatchLock.lock() + try { + val newData = microBatchThread.runUninterruptibly { + uniqueSources.flatMap(s => s.getOffset.map(o => s -> o)) + } + availableOffsets ++= newData - if (dataAvailable) { - true - } else { - noNewData = true - false + if (dataAvailable) { + true + } else { + noNewData = true + false + } + } finally { + awaitBatchLock.unlock() } } if (hasNewData) { @@ -269,9 +279,12 @@ class StreamExecution( currentBatchId += 1 logInfo(s"Committed offsets for batch $currentBatchId.") } else { - awaitBatchLock.synchronized { + awaitBatchLock.lock() + try { // Wake up any threads that are waiting for the stream to progress. - awaitBatchLock.notifyAll() + awaitBatchLockCondition.signalAll() + } finally { + awaitBatchLock.unlock() } } } @@ -332,9 +345,12 @@ class StreamExecution( new Dataset(sparkSession, lastExecution, RowEncoder(lastExecution.analyzed.schema)) sink.addBatch(currentBatchId - 1, nextBatch) - awaitBatchLock.synchronized { + awaitBatchLock.lock() + try { // Wake up any threads that are waiting for the stream to progress. - awaitBatchLock.notifyAll() + awaitBatchLockCondition.signalAll() + } finally { + awaitBatchLock.unlock() } val batchTime = (System.nanoTime() - startTime).toDouble / 1000000 @@ -374,8 +390,12 @@ class StreamExecution( } while (notDone) { - logInfo(s"Waiting until $newOffset at $source") - awaitBatchLock.synchronized { awaitBatchLock.wait(100) } + awaitBatchLock.lock() + try { + awaitBatchLockCondition.await(100, TimeUnit.MILLISECONDS) + } finally { + awaitBatchLock.unlock() + } } logDebug(s"Unblocked at $newOffset for $source") } @@ -383,16 +403,21 @@ class StreamExecution( /** A flag to indicate that a batch has completed with no new data available. */ @volatile private var noNewData = false - override def processAllAvailable(): Unit = awaitBatchLock.synchronized { - noNewData = false - while (true) { - awaitBatchLock.wait(10000) - if (streamDeathCause != null) { - throw streamDeathCause - } - if (noNewData) { - return + override def processAllAvailable(): Unit = { + awaitBatchLock.lock() + try { + noNewData = false + while (true) { + awaitBatchLockCondition.await(10000, TimeUnit.MILLISECONDS) + if (streamDeathCause != null) { + throw streamDeathCause + } + if (noNewData) { + return + } } + } finally { + awaitBatchLock.unlock() } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org