This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ef2e730662f2 [SPARK-46664][CORE] Improve `Master` to recover quickly in case of zero workers and apps ef2e730662f2 is described below commit ef2e730662f270e83a87a2395dfbf30a919b25c2 Author: Dongjoon Hyun <dongj...@apache.org> AuthorDate: Wed Jan 10 16:50:51 2024 -0800 [SPARK-46664][CORE] Improve `Master` to recover quickly in case of zero workers and apps ### What changes were proposed in this pull request? This PR aims to improve `Master` to recover quickly in case of zero workers and apps. ### Why are the changes needed? This case happens on the initial cluster creation or during cluster restarting procedure. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs with the newly added test case. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44673 from dongjoon-hyun/SPARK-46664. Authored-by: Dongjoon Hyun <dongj...@apache.org> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../org/apache/spark/deploy/master/Master.scala | 23 ++++++++++++++------- .../apache/spark/deploy/master/MasterSuite.scala | 24 ++++++++++++++++++++++ 2 files changed, 40 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index c8679c185ad7..696ce05bce48 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -244,12 +244,13 @@ private[deploy] class Master( } logInfo("I have been elected leader! New state: " + state) if (state == RecoveryState.RECOVERING) { - beginRecovery(storedApps, storedDrivers, storedWorkers) - recoveryCompletionTask = forwardMessageThread.schedule(new Runnable { - override def run(): Unit = Utils.tryLogNonFatalError { - self.send(CompleteRecovery) - } - }, recoveryTimeoutMs, TimeUnit.MILLISECONDS) + if (beginRecovery(storedApps, storedDrivers, storedWorkers)) { + recoveryCompletionTask = forwardMessageThread.schedule(new Runnable { + override def run(): Unit = Utils.tryLogNonFatalError { + self.send(CompleteRecovery) + } + }, recoveryTimeoutMs, TimeUnit.MILLISECONDS) + } } case CompleteRecovery => completeRecovery() @@ -590,7 +591,7 @@ private[deploy] class Master( private var recoveryStartTimeNs = 0L private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo], - storedWorkers: Seq[WorkerInfo]): Unit = { + storedWorkers: Seq[WorkerInfo]): Boolean = { recoveryStartTimeNs = System.nanoTime() for (app <- storedApps) { logInfo("Trying to recover app: " + app.id) @@ -619,6 +620,14 @@ private[deploy] class Master( case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect") } } + + // In case of zero workers and apps, we can complete recovery. + if (canCompleteRecovery) { + completeRecovery() + false + } else { + true + } } private def completeRecovery(): Unit = { diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index e15a5db770eb..6490ee3e8241 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -250,6 +250,30 @@ class MasterSuite extends SparkFunSuite CustomRecoveryModeFactory.instantiationAttempts should be > instantiationAttempts } + test("SPARK-46664: master should recover quickly in case of zero workers and apps") { + val conf = new SparkConf(loadDefaults = false) + conf.set(RECOVERY_MODE, "CUSTOM") + conf.set(RECOVERY_MODE_FACTORY, classOf[FakeRecoveryModeFactory].getCanonicalName) + conf.set(MASTER_REST_SERVER_ENABLED, false) + + var master: Master = null + try { + master = makeMaster(conf) + master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master) + eventually(timeout(2.seconds), interval(100.milliseconds)) { + getState(master) should be(RecoveryState.ALIVE) + } + master.workers.size should be(0) + } finally { + if (master != null) { + master.rpcEnv.shutdown() + master.rpcEnv.awaitTermination() + master = null + FakeRecoveryModeFactory.persistentData.clear() + } + } + } + test("master correctly recover the application") { val conf = new SparkConf(loadDefaults = false) conf.set(RECOVERY_MODE, "CUSTOM") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org