Repository: spark Updated Branches: refs/heads/branch-1.2 7a245412f -> edc96d81d
[SPARK-4813][Streaming] Fix the issue that ContextWaiter didn't handle 'spurious wakeup' Used `Condition` to rewrite `ContextWaiter` because it provides a convenient API `awaitNanos` for timeout. Author: zsxwing <zsxw...@gmail.com> Closes #3661 from zsxwing/SPARK-4813 and squashes the following commits: 52247f5 [zsxwing] Add explicit unit type be42bcf [zsxwing] Update as per review suggestion e06bd4f [zsxwing] Fix the issue that ContextWaiter didn't handle 'spurious wakeup' (cherry picked from commit 6a897829444e2ef273586511f93a40d36e64fb0b) Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/edc96d81 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/edc96d81 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/edc96d81 Branch: refs/heads/branch-1.2 Commit: edc96d81df66c5cb36e13fe93ab47b66a0a8a02b Parents: 7a24541 Author: zsxwing <zsxw...@gmail.com> Authored: Tue Dec 30 14:39:13 2014 -0800 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Tue Dec 30 14:39:36 2014 -0800 ---------------------------------------------------------------------- .../apache/spark/streaming/ContextWaiter.scala | 63 +++++++++++++++----- 1 file changed, 48 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/edc96d81/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala index a0aeacb..fdbbe2a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala @@ -17,30 +17,63 @@ package org.apache.spark.streaming +import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.ReentrantLock + private[streaming] class ContextWaiter { + + private val lock = new ReentrantLock() + private val condition = lock.newCondition() + + // Guarded by "lock" private var error: Throwable = null - private var stopped: Boolean = false - def notifyError(e: Throwable) = synchronized { - error = e - notifyAll() - } + // Guarded by "lock" + private var stopped: Boolean = false - def notifyStop() = synchronized { - stopped = true - notifyAll() + def notifyError(e: Throwable): Unit = { + lock.lock() + try { + error = e + condition.signalAll() + } finally { + lock.unlock() + } } - def waitForStopOrError(timeout: Long = -1) = synchronized { - // If already had error, then throw it - if (error != null) { - throw error + def notifyStop(): Unit = { + lock.lock() + try { + stopped = true + condition.signalAll() + } finally { + lock.unlock() } + } - // If not already stopped, then wait - if (!stopped) { - if (timeout < 0) wait() else wait(timeout) + /** + * Return `true` if it's stopped; or throw the reported error if `notifyError` has been called; or + * `false` if the waiting time detectably elapsed before return from the method. + */ + def waitForStopOrError(timeout: Long = -1): Boolean = { + lock.lock() + try { + if (timeout < 0) { + while (!stopped && error == null) { + condition.await() + } + } else { + var nanos = TimeUnit.MILLISECONDS.toNanos(timeout) + while (!stopped && error == null && nanos > 0) { + nanos = condition.awaitNanos(nanos) + } + } + // If already had error, then throw it if (error != null) throw error + // already stopped or timeout + stopped + } finally { + lock.unlock() } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org