Repository: spark Updated Branches: refs/heads/master 680b4e7bc -> 53e9cee3e
[SPARK-11466][CORE] Avoid mockito in multi-threaded FsHistoryProviderSuite test. The test functionality should be the same, but without using mockito; logs don't really say anything useful but I suspect it may be the cause of the flakiness, since updating mocks when multiple threads may be using it doesn't work very well. It also allows some other cleanup (= less test code in FsHistoryProvider). Author: Marcelo Vanzin <van...@cloudera.com> Closes #9425 from vanzin/SPARK-11466. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/53e9cee3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/53e9cee3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/53e9cee3 Branch: refs/heads/master Commit: 53e9cee3e4e845d1f875c487215c0f22503347b1 Parents: 680b4e7 Author: Marcelo Vanzin <van...@cloudera.com> Authored: Tue Nov 3 16:26:28 2015 -0800 Committer: Marcelo Vanzin <van...@cloudera.com> Committed: Tue Nov 3 16:26:28 2015 -0800 ---------------------------------------------------------------------- .../deploy/history/FsHistoryProvider.scala | 31 ++++++--------- .../deploy/history/FsHistoryProviderSuite.scala | 42 ++++++++++---------- 2 files changed, 34 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/53e9cee3/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 24aa386..718efc4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -113,35 +113,30 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } // Conf option used for testing the initialization code. - val initThread = if (!conf.getBoolean("spark.history.testing.skipInitialize", false)) { - initialize(None) - } else { - null - } + val initThread = initialize() - private[history] def initialize(errorHandler: Option[Thread.UncaughtExceptionHandler]): Thread = { + private[history] def initialize(): Thread = { if (!isFsInSafeMode()) { startPolling() - return null + null + } else { + startSafeModeCheckThread(None) } + } + private[history] def startSafeModeCheckThread( + errorHandler: Option[Thread.UncaughtExceptionHandler]): Thread = { // Cannot probe anything while the FS is in safe mode, so spawn a new thread that will wait // for the FS to leave safe mode before enabling polling. This allows the main history server // UI to be shown (so that the user can see the HDFS status). - // - // The synchronization in the run() method is needed because of the tests; mockito can - // misbehave if the test is modifying the mocked methods while the thread is calling - // them. val initThread = new Thread(new Runnable() { override def run(): Unit = { try { - clock.synchronized { - while (isFsInSafeMode()) { - logInfo("HDFS is still in safe mode. Waiting...") - val deadline = clock.getTimeMillis() + - TimeUnit.SECONDS.toMillis(SAFEMODE_CHECK_INTERVAL_S) - clock.waitTillTime(deadline) - } + while (isFsInSafeMode()) { + logInfo("HDFS is still in safe mode. Waiting...") + val deadline = clock.getTimeMillis() + + TimeUnit.SECONDS.toMillis(SAFEMODE_CHECK_INTERVAL_S) + clock.waitTillTime(deadline) } startPolling() } catch { http://git-wip-us.apache.org/repos/asf/spark/blob/53e9cee3/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 833aab1..5cab17f 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -41,7 +41,7 @@ import org.scalatest.concurrent.Eventually._ import org.apache.spark.{Logging, SparkConf, SparkFunSuite} import org.apache.spark.io._ import org.apache.spark.scheduler._ -import org.apache.spark.util.{JsonProtocol, ManualClock, Utils} +import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils} class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matchers with Logging { @@ -423,22 +423,16 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc test("provider waits for safe mode to finish before initializing") { val clock = new ManualClock() - val conf = createTestConf().set("spark.history.testing.skipInitialize", "true") - val provider = spy(new FsHistoryProvider(conf, clock)) - doReturn(true).when(provider).isFsInSafeMode() - - val initThread = provider.initialize(None) + val provider = new SafeModeTestProvider(createTestConf(), clock) + val initThread = provider.initialize() try { provider.getConfig().keys should contain ("HDFS State") clock.setTime(5000) provider.getConfig().keys should contain ("HDFS State") - // Synchronization needed because of mockito. - clock.synchronized { - doReturn(false).when(provider).isFsInSafeMode() - clock.setTime(10000) - } + provider.inSafeMode = false + clock.setTime(10000) eventually(timeout(1 second), interval(10 millis)) { provider.getConfig().keys should not contain ("HDFS State") @@ -451,18 +445,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc test("provider reports error after FS leaves safe mode") { testDir.delete() val clock = new ManualClock() - val conf = createTestConf().set("spark.history.testing.skipInitialize", "true") - val provider = spy(new FsHistoryProvider(conf, clock)) - doReturn(true).when(provider).isFsInSafeMode() - + val provider = new SafeModeTestProvider(createTestConf(), clock) val errorHandler = mock(classOf[Thread.UncaughtExceptionHandler]) - val initThread = provider.initialize(Some(errorHandler)) + val initThread = provider.startSafeModeCheckThread(Some(errorHandler)) try { - // Synchronization needed because of mockito. - clock.synchronized { - doReturn(false).when(provider).isFsInSafeMode() - clock.setTime(10000) - } + provider.inSafeMode = false + clock.setTime(10000) eventually(timeout(1 second), interval(10 millis)) { verify(errorHandler).uncaughtException(any(), any()) @@ -530,4 +518,16 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc log } + private class SafeModeTestProvider(conf: SparkConf, clock: Clock) + extends FsHistoryProvider(conf, clock) { + + @volatile var inSafeMode = true + + // Skip initialization so that we can manually start the safe mode check thread. + private[history] override def initialize(): Thread = null + + private[history] override def isFsInSafeMode(): Boolean = inSafeMode + + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org