Repository: spark
Updated Branches:
  refs/heads/branch-2.0 733cbaa3c -> dcce0aaaf


[SPARK-15077][SQL] Use a fair lock to avoid thread starvation in StreamExecution

## What changes were proposed in this pull request?

Right now `StreamExecution.awaitBatchLock` uses an unfair lock. 
`StreamExecution.awaitOffset` may run too long and fail some test because 
`StreamExecution.constructNextBatch` keeps getting the lock.

See: 
https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.4/865/testReport/junit/org.apache.spark.sql.streaming/FileStreamSourceStressTestSuite/file_source_stress_test/

This PR uses a fair ReentrantLock to resolve the thread starvation issue.

## How was this patch tested?

Modified `FileStreamSourceStressTestSuite.test("file source stress test")` to 
run the test codes 100 times locally. It always fails because of timeout 
without this patch.

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #12852 from zsxwing/SPARK-15077.

(cherry picked from commit 4e3685ae5e5826e63bfcd7c3729e3b9cbab484b5)
Signed-off-by: Michael Armbrust <mich...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dcce0aaa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dcce0aaa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dcce0aaa

Branch: refs/heads/branch-2.0
Commit: dcce0aaafedc496e3e69c02c51ad31f01de05287
Parents: 733cbaa
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Mon May 2 18:27:49 2016 -0700
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Mon May 2 18:28:01 2016 -0700

----------------------------------------------------------------------
 .../execution/streaming/StreamExecution.scala   | 79 +++++++++++++-------
 1 file changed, 52 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dcce0aaa/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index ce68c09..3108346 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming
 
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.locks.ReentrantLock
 
 import scala.collection.mutable.ArrayBuffer
 import scala.util.control.NonFatal
@@ -53,8 +54,12 @@ class StreamExecution(
     val trigger: Trigger)
   extends ContinuousQuery with Logging {
 
-  /** An monitor used to wait/notify when batches complete. */
-  private val awaitBatchLock = new Object
+  /**
+   * A lock used to wait/notify when batches complete. Use a fair lock to 
avoid thread starvation.
+   */
+  private val awaitBatchLock = new ReentrantLock(true)
+  private val awaitBatchLockCondition = awaitBatchLock.newCondition()
+
   private val startLatch = new CountDownLatch(1)
   private val terminationLatch = new CountDownLatch(1)
 
@@ -242,17 +247,22 @@ class StreamExecution(
     // method. See SPARK-14131.
     //
     // Check to see what new data is available.
-    val hasNewData = awaitBatchLock.synchronized {
-      val newData = microBatchThread.runUninterruptibly {
-        uniqueSources.flatMap(s => s.getOffset.map(o => s -> o))
-      }
-      availableOffsets ++= newData
+    val hasNewData = {
+      awaitBatchLock.lock()
+      try {
+        val newData = microBatchThread.runUninterruptibly {
+          uniqueSources.flatMap(s => s.getOffset.map(o => s -> o))
+        }
+        availableOffsets ++= newData
 
-      if (dataAvailable) {
-        true
-      } else {
-        noNewData = true
-        false
+        if (dataAvailable) {
+          true
+        } else {
+          noNewData = true
+          false
+        }
+      } finally {
+        awaitBatchLock.unlock()
       }
     }
     if (hasNewData) {
@@ -269,9 +279,12 @@ class StreamExecution(
       currentBatchId += 1
       logInfo(s"Committed offsets for batch $currentBatchId.")
     } else {
-      awaitBatchLock.synchronized {
+      awaitBatchLock.lock()
+      try {
         // Wake up any threads that are waiting for the stream to progress.
-        awaitBatchLock.notifyAll()
+        awaitBatchLockCondition.signalAll()
+      } finally {
+        awaitBatchLock.unlock()
       }
     }
   }
@@ -332,9 +345,12 @@ class StreamExecution(
       new Dataset(sparkSession, lastExecution, 
RowEncoder(lastExecution.analyzed.schema))
     sink.addBatch(currentBatchId - 1, nextBatch)
 
-    awaitBatchLock.synchronized {
+    awaitBatchLock.lock()
+    try {
       // Wake up any threads that are waiting for the stream to progress.
-      awaitBatchLock.notifyAll()
+      awaitBatchLockCondition.signalAll()
+    } finally {
+      awaitBatchLock.unlock()
     }
 
     val batchTime = (System.nanoTime() - startTime).toDouble / 1000000
@@ -374,8 +390,12 @@ class StreamExecution(
     }
 
     while (notDone) {
-      logInfo(s"Waiting until $newOffset at $source")
-      awaitBatchLock.synchronized { awaitBatchLock.wait(100) }
+      awaitBatchLock.lock()
+      try {
+        awaitBatchLockCondition.await(100, TimeUnit.MILLISECONDS)
+      } finally {
+        awaitBatchLock.unlock()
+      }
     }
     logDebug(s"Unblocked at $newOffset for $source")
   }
@@ -383,16 +403,21 @@ class StreamExecution(
   /** A flag to indicate that a batch has completed with no new data 
available. */
   @volatile private var noNewData = false
 
-  override def processAllAvailable(): Unit = awaitBatchLock.synchronized {
-    noNewData = false
-    while (true) {
-      awaitBatchLock.wait(10000)
-      if (streamDeathCause != null) {
-        throw streamDeathCause
-      }
-      if (noNewData) {
-        return
+  override def processAllAvailable(): Unit = {
+    awaitBatchLock.lock()
+    try {
+      noNewData = false
+      while (true) {
+        awaitBatchLockCondition.await(10000, TimeUnit.MILLISECONDS)
+        if (streamDeathCause != null) {
+          throw streamDeathCause
+        }
+        if (noNewData) {
+          return
+        }
       }
+    } finally {
+      awaitBatchLock.unlock()
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to