Repository: spark
Updated Branches:
  refs/heads/master 680b4e7bc -> 53e9cee3e


[SPARK-11466][CORE] Avoid mockito in multi-threaded FsHistoryProviderSuite test.

The test functionality should be the same, but without using mockito; logs don't
really say anything useful but I suspect it may be the cause of the flakiness,
since updating mocks when multiple threads may be using it doesn't work very
well. It also allows some other cleanup (= less test code in FsHistoryProvider).

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #9425 from vanzin/SPARK-11466.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/53e9cee3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/53e9cee3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/53e9cee3

Branch: refs/heads/master
Commit: 53e9cee3e4e845d1f875c487215c0f22503347b1
Parents: 680b4e7
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Tue Nov 3 16:26:28 2015 -0800
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Tue Nov 3 16:26:28 2015 -0800

----------------------------------------------------------------------
 .../deploy/history/FsHistoryProvider.scala      | 31 ++++++---------
 .../deploy/history/FsHistoryProviderSuite.scala | 42 ++++++++++----------
 2 files changed, 34 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/53e9cee3/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 24aa386..718efc4 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -113,35 +113,30 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
   }
 
   // Conf option used for testing the initialization code.
-  val initThread = if 
(!conf.getBoolean("spark.history.testing.skipInitialize", false)) {
-      initialize(None)
-    } else {
-      null
-    }
+  val initThread = initialize()
 
-  private[history] def initialize(errorHandler: 
Option[Thread.UncaughtExceptionHandler]): Thread = {
+  private[history] def initialize(): Thread = {
     if (!isFsInSafeMode()) {
       startPolling()
-      return null
+      null
+    } else {
+      startSafeModeCheckThread(None)
     }
+  }
 
+  private[history] def startSafeModeCheckThread(
+      errorHandler: Option[Thread.UncaughtExceptionHandler]): Thread = {
     // Cannot probe anything while the FS is in safe mode, so spawn a new 
thread that will wait
     // for the FS to leave safe mode before enabling polling. This allows the 
main history server
     // UI to be shown (so that the user can see the HDFS status).
-    //
-    // The synchronization in the run() method is needed because of the tests; 
mockito can
-    // misbehave if the test is modifying the mocked methods while the thread 
is calling
-    // them.
     val initThread = new Thread(new Runnable() {
       override def run(): Unit = {
         try {
-          clock.synchronized {
-            while (isFsInSafeMode()) {
-              logInfo("HDFS is still in safe mode. Waiting...")
-              val deadline = clock.getTimeMillis() +
-                TimeUnit.SECONDS.toMillis(SAFEMODE_CHECK_INTERVAL_S)
-              clock.waitTillTime(deadline)
-            }
+          while (isFsInSafeMode()) {
+            logInfo("HDFS is still in safe mode. Waiting...")
+            val deadline = clock.getTimeMillis() +
+              TimeUnit.SECONDS.toMillis(SAFEMODE_CHECK_INTERVAL_S)
+            clock.waitTillTime(deadline)
           }
           startPolling()
         } catch {

http://git-wip-us.apache.org/repos/asf/spark/blob/53e9cee3/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 833aab1..5cab17f 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -41,7 +41,7 @@ import org.scalatest.concurrent.Eventually._
 import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
 import org.apache.spark.io._
 import org.apache.spark.scheduler._
-import org.apache.spark.util.{JsonProtocol, ManualClock, Utils}
+import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils}
 
 class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with 
Matchers with Logging {
 
@@ -423,22 +423,16 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
 
   test("provider waits for safe mode to finish before initializing") {
     val clock = new ManualClock()
-    val conf = createTestConf().set("spark.history.testing.skipInitialize", 
"true")
-    val provider = spy(new FsHistoryProvider(conf, clock))
-    doReturn(true).when(provider).isFsInSafeMode()
-
-    val initThread = provider.initialize(None)
+    val provider = new SafeModeTestProvider(createTestConf(), clock)
+    val initThread = provider.initialize()
     try {
       provider.getConfig().keys should contain ("HDFS State")
 
       clock.setTime(5000)
       provider.getConfig().keys should contain ("HDFS State")
 
-      // Synchronization needed because of mockito.
-      clock.synchronized {
-        doReturn(false).when(provider).isFsInSafeMode()
-        clock.setTime(10000)
-      }
+      provider.inSafeMode = false
+      clock.setTime(10000)
 
       eventually(timeout(1 second), interval(10 millis)) {
         provider.getConfig().keys should not contain ("HDFS State")
@@ -451,18 +445,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
   test("provider reports error after FS leaves safe mode") {
     testDir.delete()
     val clock = new ManualClock()
-    val conf = createTestConf().set("spark.history.testing.skipInitialize", 
"true")
-    val provider = spy(new FsHistoryProvider(conf, clock))
-    doReturn(true).when(provider).isFsInSafeMode()
-
+    val provider = new SafeModeTestProvider(createTestConf(), clock)
     val errorHandler = mock(classOf[Thread.UncaughtExceptionHandler])
-    val initThread = provider.initialize(Some(errorHandler))
+    val initThread = provider.startSafeModeCheckThread(Some(errorHandler))
     try {
-      // Synchronization needed because of mockito.
-      clock.synchronized {
-        doReturn(false).when(provider).isFsInSafeMode()
-        clock.setTime(10000)
-      }
+      provider.inSafeMode = false
+      clock.setTime(10000)
 
       eventually(timeout(1 second), interval(10 millis)) {
         verify(errorHandler).uncaughtException(any(), any())
@@ -530,4 +518,16 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
     log
   }
 
+  private class SafeModeTestProvider(conf: SparkConf, clock: Clock)
+    extends FsHistoryProvider(conf, clock) {
+
+    @volatile var inSafeMode = true
+
+    // Skip initialization so that we can manually start the safe mode check 
thread.
+    private[history] override def initialize(): Thread = null
+
+    private[history] override def isFsInSafeMode(): Boolean = inSafeMode
+
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to