This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new beb0238273b9 [SPARK-45819][CORE] Support `clear` in REST Submission API beb0238273b9 is described below commit beb0238273b937bb42e746f7b240dd63e48f0667 Author: Dongjoon Hyun <dh...@apple.com> AuthorDate: Tue Nov 7 10:11:19 2023 -0800 [SPARK-45819][CORE] Support `clear` in REST Submission API ### What changes were proposed in this pull request? This PR aims to support `clear` in REST Submission API to clear `completed` drivers and apps. ### Why are the changes needed? This new feature is helpful for users to reset the completed drivers and apps in Spark Master. **"1 Completed"** <img width="672" alt="Screenshot 2023-11-07 at 12 56 02 AM" src="https://github.com/apache/spark/assets/9700541/1ba9e01c-3c66-4161-b7c6-b86b57837ae0"> **After invoking `clear` API, "0 Completed"** ``` $ curl -X POST http://max.local:6066/v1/submissions/clear { "action" : "ClearResponse", "message" : "", "serverSparkVersion" : "4.0.0-SNAPSHOT", "success" : true } ``` <img width="677" alt="Screenshot 2023-11-07 at 12 56 24 AM" src="https://github.com/apache/spark/assets/9700541/a6ee816b-217e-4f93-bd55-a0c8a53c4729"> ### Does this PR introduce _any_ user-facing change? No, this is a new API. ### How was this patch tested? Pass the CIs with the newly added test case. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43698 from dongjoon-hyun/SPARK-45819. Lead-authored-by: Dongjoon Hyun <dh...@apple.com> Co-authored-by: Dongjoon Hyun <dongj...@apache.org> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../org/apache/spark/deploy/DeployMessage.scala | 2 ++ .../org/apache/spark/deploy/master/Master.scala | 8 +++++ .../spark/deploy/rest/RestSubmissionClient.scala | 35 ++++++++++++++++++++++ .../spark/deploy/rest/RestSubmissionServer.scala | 22 +++++++++++++- .../spark/deploy/rest/StandaloneRestServer.scala | 19 ++++++++++++ .../deploy/rest/SubmitRestProtocolResponse.scala | 10 +++++++ .../deploy/rest/StandaloneRestSubmitSuite.scala | 33 ++++++++++++++++++++ 7 files changed, 128 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 4ec0edd5909e..f49530461b4d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -238,6 +238,8 @@ private[deploy] object DeployMessages { case class DriverStatusResponse(found: Boolean, state: Option[DriverState], workerId: Option[String], workerHostPort: Option[String], exception: Option[Exception]) + case object RequestClearCompletedDriversAndApps extends DeployMessage + // Internal message in AppClient case object StopAppClient diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index e63d72ebb40d..3ba50318610b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -460,6 +460,14 @@ private[deploy] class Master( } } + case RequestClearCompletedDriversAndApps => + val numDrivers = completedDrivers.length + val numApps = completedApps.length + logInfo(s"Asked to clear $numDrivers completed drivers and $numApps completed apps.") + completedDrivers.clear() + completedApps.clear() + context.reply(true) + case RequestDriverStatus(driverId) => if (state != RecoveryState.ALIVE) { val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " + diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala index 68f08dd951ef..3010efc936f9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala @@ -135,6 +135,35 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { response } + /** Request that the server clears all submissions and applications. */ + def clear(): SubmitRestProtocolResponse = { + logInfo(s"Submitting a request to clear $master.") + var handled: Boolean = false + var response: SubmitRestProtocolResponse = null + for (m <- masters if !handled) { + validateMaster(m) + val url = getClearUrl(m) + try { + response = post(url) + response match { + case k: ClearResponse => + if (!Utils.responseFromBackup(k.message)) { + handleRestResponse(k) + handled = true + } + case unexpected => + handleUnexpectedRestResponse(unexpected) + } + } catch { + case e: SubmitRestConnectionException => + if (handleConnectionException(m)) { + throw new SubmitRestConnectionException("Unable to connect to server", e) + } + } + } + response + } + /** Request the status of a submission from the server. */ def requestSubmissionStatus( submissionId: String, @@ -300,6 +329,12 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { new URL(s"$baseUrl/kill/$submissionId") } + /** Return the REST URL for clear all existing submissions and applications. */ + private def getClearUrl(master: String): URL = { + val baseUrl = getBaseUrl(master) + new URL(s"$baseUrl/clear") + } + /** Return the REST URL for requesting the status of an existing submission. */ private def getStatusUrl(master: String, submissionId: String): URL = { val baseUrl = getBaseUrl(master) diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala index 41845dc31a98..3323d0f529eb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala @@ -55,6 +55,7 @@ private[spark] abstract class RestSubmissionServer( protected val submitRequestServlet: SubmitRequestServlet protected val killRequestServlet: KillRequestServlet protected val statusRequestServlet: StatusRequestServlet + protected val clearRequestServlet: ClearRequestServlet private var _server: Option[Server] = None @@ -64,6 +65,7 @@ private[spark] abstract class RestSubmissionServer( s"$baseContext/create/*" -> submitRequestServlet, s"$baseContext/kill/*" -> killRequestServlet, s"$baseContext/status/*" -> statusRequestServlet, + s"$baseContext/clear/*" -> clearRequestServlet, "/*" -> new ErrorServlet // default handler ) @@ -227,6 +229,24 @@ private[rest] abstract class KillRequestServlet extends RestServlet { protected def handleKill(submissionId: String): KillSubmissionResponse } +/** + * A servlet for handling clear requests passed to the [[RestSubmissionServer]]. + */ +private[rest] abstract class ClearRequestServlet extends RestServlet { + + /** + * Clear the completed drivers and apps. + */ + protected override def doPost( + request: HttpServletRequest, + response: HttpServletResponse): Unit = { + val responseMessage = handleClear() + sendResponse(responseMessage, response) + } + + protected def handleClear(): ClearResponse +} + /** * A servlet for handling status requests passed to the [[RestSubmissionServer]]. */ @@ -311,7 +331,7 @@ private class ErrorServlet extends RestServlet { "Missing the /submissions prefix." case `serverVersion` :: "submissions" :: tail => // http://host:port/correct-version/submissions/* - "Missing an action: please specify one of /create, /kill, or /status." + "Missing an action: please specify one of /create, /kill, /clear or /status." case unknownVersion :: tail => // http://host:port/unknown-version/* versionMismatch = true diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala index a298e4f6dbf0..8ed716428dc2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala @@ -65,6 +65,8 @@ private[deploy] class StandaloneRestServer( new StandaloneKillRequestServlet(masterEndpoint, masterConf) protected override val statusRequestServlet = new StandaloneStatusRequestServlet(masterEndpoint, masterConf) + protected override val clearRequestServlet = + new StandaloneClearRequestServlet(masterEndpoint, masterConf) } /** @@ -107,6 +109,23 @@ private[rest] class StandaloneStatusRequestServlet(masterEndpoint: RpcEndpointRe } } +/** + * A servlet for handling clear requests passed to the [[StandaloneRestServer]]. + */ +private[rest] class StandaloneClearRequestServlet(masterEndpoint: RpcEndpointRef, conf: SparkConf) + extends ClearRequestServlet { + + protected def handleClear(): ClearResponse = { + val response = masterEndpoint.askSync[Boolean]( + DeployMessages.RequestClearCompletedDriversAndApps) + val c = new ClearResponse + c.serverSparkVersion = sparkVersion + c.message = "" + c.success = response + c + } +} + /** * A servlet for handling submit requests passed to the [[StandaloneRestServer]]. */ diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolResponse.scala b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolResponse.scala index 0e226ee294ca..21614c22285f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolResponse.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolResponse.scala @@ -55,6 +55,16 @@ private[spark] class KillSubmissionResponse extends SubmitRestProtocolResponse { } } +/** + * A response to a clear request in the REST application submission protocol. + */ +private[spark] class ClearResponse extends SubmitRestProtocolResponse { + protected override def doValidate(): Unit = { + super.doValidate() + assertFieldIsSet(success, "success") + } +} + /** * A response to a status request in the REST application submission protocol. */ diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala index 3cd96670c8b5..d775aa6542dc 100644 --- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala @@ -227,6 +227,15 @@ class StandaloneRestSubmitSuite extends SparkFunSuite { assert(statusResponse.submissionId === doesNotExist) } + test("SPARK-45819: clear") { + val masterUrl = startDummyServer() + val response = new RestSubmissionClient(masterUrl).clear() + val clearResponse = getClearResponse(response) + assert(clearResponse.action === Utils.getFormattedClassName(clearResponse)) + assert(clearResponse.serverSparkVersion === SPARK_VERSION) + assert(clearResponse.success) + } + /* ---------------------------------------- * | Aberrant client / server behavior | * ---------------------------------------- */ @@ -505,6 +514,15 @@ class StandaloneRestSubmitSuite extends SparkFunSuite { } } + /** Return the response as a clear response, or fail with error otherwise. */ + private def getClearResponse(response: SubmitRestProtocolResponse): ClearResponse = { + response match { + case k: ClearResponse => k + case e: ErrorResponse => fail(s"Server returned error: ${e.message}") + case r => fail(s"Expected clear response. Actual: ${r.toJson}") + } + } + /** Return the response as a status response, or fail with error otherwise. */ private def getStatusResponse(response: SubmitRestProtocolResponse): SubmissionStatusResponse = { response match { @@ -574,6 +592,8 @@ private class DummyMaster( context.reply(KillDriverResponse(self, driverId, success = true, killMessage)) case RequestDriverStatus(driverId) => context.reply(DriverStatusResponse(found = true, Some(state), None, None, exception)) + case RequestClearCompletedDriversAndApps => + context.reply(true) } } @@ -617,6 +637,7 @@ private class SmarterMaster(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEn * When handling a submit request, the server returns a malformed JSON. * When handling a kill request, the server returns an invalid JSON. * When handling a status request, the server throws an internal exception. + * When handling a clear request, the server throws an internal exception. * The purpose of this class is to test that client handles these cases gracefully. */ private class FaultyStandaloneRestServer( @@ -630,6 +651,7 @@ private class FaultyStandaloneRestServer( protected override val submitRequestServlet = new MalformedSubmitServlet protected override val killRequestServlet = new InvalidKillServlet protected override val statusRequestServlet = new ExplodingStatusServlet + protected override val clearRequestServlet = new ExplodingClearServlet /** A faulty servlet that produces malformed responses. */ class MalformedSubmitServlet @@ -660,4 +682,15 @@ private class FaultyStandaloneRestServer( s } } + + /** A faulty clear servlet that explodes. */ + class ExplodingClearServlet extends StandaloneClearRequestServlet(masterEndpoint, masterConf) { + private def explode: Int = 1 / 0 + + protected override def handleClear(): ClearResponse = { + val s = super.handleClear() + s.message = explode.toString + s + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org