Repository: spark
Updated Branches:
  refs/heads/branch-1.1 d6b8d2c03 -> eac740e9a


[SPARK-4813][Streaming] Fix the issue that ContextWaiter didn't handle 
'spurious wakeup'

Used `Condition` to rewrite `ContextWaiter` because it provides a convenient 
API `awaitNanos` for timeout.

Author: zsxwing <zsxw...@gmail.com>

Closes #3661 from zsxwing/SPARK-4813 and squashes the following commits:

52247f5 [zsxwing] Add explicit unit type
be42bcf [zsxwing] Update as per review suggestion
e06bd4f [zsxwing] Fix the issue that ContextWaiter didn't handle 'spurious 
wakeup'

(cherry picked from commit 6a897829444e2ef273586511f93a40d36e64fb0b)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eac740e9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eac740e9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eac740e9

Branch: refs/heads/branch-1.1
Commit: eac740e9a11d852d4a192d8c8f63862ed29c43fd
Parents: d6b8d2c
Author: zsxwing <zsxw...@gmail.com>
Authored: Tue Dec 30 14:39:13 2014 -0800
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Tue Dec 30 14:39:53 2014 -0800

----------------------------------------------------------------------
 .../apache/spark/streaming/ContextWaiter.scala  | 63 +++++++++++++++-----
 1 file changed, 48 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/eac740e9/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala
index a0aeacb..fdbbe2a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala
@@ -17,30 +17,63 @@
 
 package org.apache.spark.streaming
 
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.ReentrantLock
+
 private[streaming] class ContextWaiter {
+
+  private val lock = new ReentrantLock()
+  private val condition = lock.newCondition()
+
+  // Guarded by "lock"
   private var error: Throwable = null
-  private var stopped: Boolean = false
 
-  def notifyError(e: Throwable) = synchronized {
-    error = e
-    notifyAll()
-  }
+  // Guarded by "lock"
+  private var stopped: Boolean = false
 
-  def notifyStop() = synchronized {
-    stopped = true
-    notifyAll()
+  def notifyError(e: Throwable): Unit = {
+    lock.lock()
+    try {
+      error = e
+      condition.signalAll()
+    } finally {
+      lock.unlock()
+    }
   }
 
-  def waitForStopOrError(timeout: Long = -1) = synchronized {
-    // If already had error, then throw it
-    if (error != null) {
-      throw error
+  def notifyStop(): Unit = {
+    lock.lock()
+    try {
+      stopped = true
+      condition.signalAll()
+    } finally {
+      lock.unlock()
     }
+  }
 
-    // If not already stopped, then wait
-    if (!stopped) {
-      if (timeout < 0) wait() else wait(timeout)
+  /**
+   * Return `true` if it's stopped; or throw the reported error if 
`notifyError` has been called; or
+   * `false` if the waiting time detectably elapsed before return from the 
method.
+   */
+  def waitForStopOrError(timeout: Long = -1): Boolean = {
+    lock.lock()
+    try {
+      if (timeout < 0) {
+        while (!stopped && error == null) {
+          condition.await()
+        }
+      } else {
+        var nanos = TimeUnit.MILLISECONDS.toNanos(timeout)
+        while (!stopped && error == null && nanos > 0) {
+          nanos = condition.awaitNanos(nanos)
+        }
+      }
+      // If already had error, then throw it
       if (error != null) throw error
+      // already stopped or timeout
+      stopped
+    } finally {
+      lock.unlock()
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to