This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new beb0238273b9 [SPARK-45819][CORE] Support `clear` in REST Submission API
beb0238273b9 is described below

commit beb0238273b937bb42e746f7b240dd63e48f0667
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Tue Nov 7 10:11:19 2023 -0800

    [SPARK-45819][CORE] Support `clear` in REST Submission API
    
    ### What changes were proposed in this pull request?
    
    This PR aims to support `clear` in REST Submission API to clear `completed` 
drivers and apps.
    
    ### Why are the changes needed?
    
    This new feature is helpful for users to reset the completed drivers and 
apps in Spark Master.
    
    **"1 Completed"**
    <img width="672" alt="Screenshot 2023-11-07 at 12 56 02 AM" 
src="https://github.com/apache/spark/assets/9700541/1ba9e01c-3c66-4161-b7c6-b86b57837ae0";>
    
    **After invoking `clear` API, "0 Completed"**
    ```
    $ curl -X POST http://max.local:6066/v1/submissions/clear
    {
      "action" : "ClearResponse",
      "message" : "",
      "serverSparkVersion" : "4.0.0-SNAPSHOT",
      "success" : true
    }
    ```
    
    <img width="677" alt="Screenshot 2023-11-07 at 12 56 24 AM" 
src="https://github.com/apache/spark/assets/9700541/a6ee816b-217e-4f93-bd55-a0c8a53c4729";>
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, this is a new API.
    
    ### How was this patch tested?
    
    Pass the CIs with the newly added test case.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #43698 from dongjoon-hyun/SPARK-45819.
    
    Lead-authored-by: Dongjoon Hyun <dh...@apple.com>
    Co-authored-by: Dongjoon Hyun <dongj...@apache.org>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../org/apache/spark/deploy/DeployMessage.scala    |  2 ++
 .../org/apache/spark/deploy/master/Master.scala    |  8 +++++
 .../spark/deploy/rest/RestSubmissionClient.scala   | 35 ++++++++++++++++++++++
 .../spark/deploy/rest/RestSubmissionServer.scala   | 22 +++++++++++++-
 .../spark/deploy/rest/StandaloneRestServer.scala   | 19 ++++++++++++
 .../deploy/rest/SubmitRestProtocolResponse.scala   | 10 +++++++
 .../deploy/rest/StandaloneRestSubmitSuite.scala    | 33 ++++++++++++++++++++
 7 files changed, 128 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala 
b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 4ec0edd5909e..f49530461b4d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -238,6 +238,8 @@ private[deploy] object DeployMessages {
   case class DriverStatusResponse(found: Boolean, state: Option[DriverState],
     workerId: Option[String], workerHostPort: Option[String], exception: 
Option[Exception])
 
+  case object RequestClearCompletedDriversAndApps extends DeployMessage
+
   // Internal message in AppClient
 
   case object StopAppClient
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index e63d72ebb40d..3ba50318610b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -460,6 +460,14 @@ private[deploy] class Master(
         }
       }
 
+    case RequestClearCompletedDriversAndApps =>
+      val numDrivers = completedDrivers.length
+      val numApps = completedApps.length
+      logInfo(s"Asked to clear $numDrivers completed drivers and $numApps 
completed apps.")
+      completedDrivers.clear()
+      completedApps.clear()
+      context.reply(true)
+
     case RequestDriverStatus(driverId) =>
       if (state != RecoveryState.ALIVE) {
         val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala 
b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
index 68f08dd951ef..3010efc936f9 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
@@ -135,6 +135,35 @@ private[spark] class RestSubmissionClient(master: String) 
extends Logging {
     response
   }
 
+  /** Request that the server clears all submissions and applications. */
+  def clear(): SubmitRestProtocolResponse = {
+    logInfo(s"Submitting a request to clear $master.")
+    var handled: Boolean = false
+    var response: SubmitRestProtocolResponse = null
+    for (m <- masters if !handled) {
+      validateMaster(m)
+      val url = getClearUrl(m)
+      try {
+        response = post(url)
+        response match {
+          case k: ClearResponse =>
+            if (!Utils.responseFromBackup(k.message)) {
+              handleRestResponse(k)
+              handled = true
+            }
+          case unexpected =>
+            handleUnexpectedRestResponse(unexpected)
+        }
+      } catch {
+        case e: SubmitRestConnectionException =>
+          if (handleConnectionException(m)) {
+            throw new SubmitRestConnectionException("Unable to connect to 
server", e)
+          }
+      }
+    }
+    response
+  }
+
   /** Request the status of a submission from the server. */
   def requestSubmissionStatus(
       submissionId: String,
@@ -300,6 +329,12 @@ private[spark] class RestSubmissionClient(master: String) 
extends Logging {
     new URL(s"$baseUrl/kill/$submissionId")
   }
 
+  /** Return the REST URL for clear all existing submissions and applications. 
*/
+  private def getClearUrl(master: String): URL = {
+    val baseUrl = getBaseUrl(master)
+    new URL(s"$baseUrl/clear")
+  }
+
   /** Return the REST URL for requesting the status of an existing submission. 
*/
   private def getStatusUrl(master: String, submissionId: String): URL = {
     val baseUrl = getBaseUrl(master)
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala 
b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala
index 41845dc31a98..3323d0f529eb 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala
@@ -55,6 +55,7 @@ private[spark] abstract class RestSubmissionServer(
   protected val submitRequestServlet: SubmitRequestServlet
   protected val killRequestServlet: KillRequestServlet
   protected val statusRequestServlet: StatusRequestServlet
+  protected val clearRequestServlet: ClearRequestServlet
 
   private var _server: Option[Server] = None
 
@@ -64,6 +65,7 @@ private[spark] abstract class RestSubmissionServer(
     s"$baseContext/create/*" -> submitRequestServlet,
     s"$baseContext/kill/*" -> killRequestServlet,
     s"$baseContext/status/*" -> statusRequestServlet,
+    s"$baseContext/clear/*" -> clearRequestServlet,
     "/*" -> new ErrorServlet // default handler
   )
 
@@ -227,6 +229,24 @@ private[rest] abstract class KillRequestServlet extends 
RestServlet {
   protected def handleKill(submissionId: String): KillSubmissionResponse
 }
 
+/**
+ * A servlet for handling clear requests passed to the 
[[RestSubmissionServer]].
+ */
+private[rest] abstract class ClearRequestServlet extends RestServlet {
+
+  /**
+   * Clear the completed drivers and apps.
+   */
+  protected override def doPost(
+      request: HttpServletRequest,
+      response: HttpServletResponse): Unit = {
+    val responseMessage = handleClear()
+    sendResponse(responseMessage, response)
+  }
+
+  protected def handleClear(): ClearResponse
+}
+
 /**
  * A servlet for handling status requests passed to the 
[[RestSubmissionServer]].
  */
@@ -311,7 +331,7 @@ private class ErrorServlet extends RestServlet {
           "Missing the /submissions prefix."
         case `serverVersion` :: "submissions" :: tail =>
           // http://host:port/correct-version/submissions/*
-          "Missing an action: please specify one of /create, /kill, or 
/status."
+          "Missing an action: please specify one of /create, /kill, /clear or 
/status."
         case unknownVersion :: tail =>
           // http://host:port/unknown-version/*
           versionMismatch = true
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala 
b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
index a298e4f6dbf0..8ed716428dc2 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
@@ -65,6 +65,8 @@ private[deploy] class StandaloneRestServer(
     new StandaloneKillRequestServlet(masterEndpoint, masterConf)
   protected override val statusRequestServlet =
     new StandaloneStatusRequestServlet(masterEndpoint, masterConf)
+  protected override val clearRequestServlet =
+    new StandaloneClearRequestServlet(masterEndpoint, masterConf)
 }
 
 /**
@@ -107,6 +109,23 @@ private[rest] class 
StandaloneStatusRequestServlet(masterEndpoint: RpcEndpointRe
   }
 }
 
+/**
+ * A servlet for handling clear requests passed to the 
[[StandaloneRestServer]].
+ */
+private[rest] class StandaloneClearRequestServlet(masterEndpoint: 
RpcEndpointRef, conf: SparkConf)
+  extends ClearRequestServlet {
+
+  protected def handleClear(): ClearResponse = {
+    val response = masterEndpoint.askSync[Boolean](
+      DeployMessages.RequestClearCompletedDriversAndApps)
+    val c = new ClearResponse
+    c.serverSparkVersion = sparkVersion
+    c.message = ""
+    c.success = response
+    c
+  }
+}
+
 /**
  * A servlet for handling submit requests passed to the 
[[StandaloneRestServer]].
  */
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolResponse.scala
 
b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolResponse.scala
index 0e226ee294ca..21614c22285f 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolResponse.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolResponse.scala
@@ -55,6 +55,16 @@ private[spark] class KillSubmissionResponse extends 
SubmitRestProtocolResponse {
   }
 }
 
+/**
+ * A response to a clear request in the REST application submission protocol.
+ */
+private[spark] class ClearResponse extends SubmitRestProtocolResponse {
+  protected override def doValidate(): Unit = {
+    super.doValidate()
+    assertFieldIsSet(success, "success")
+  }
+}
+
 /**
  * A response to a status request in the REST application submission protocol.
  */
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
index 3cd96670c8b5..d775aa6542dc 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
@@ -227,6 +227,15 @@ class StandaloneRestSubmitSuite extends SparkFunSuite {
     assert(statusResponse.submissionId === doesNotExist)
   }
 
+  test("SPARK-45819: clear") {
+    val masterUrl = startDummyServer()
+    val response = new RestSubmissionClient(masterUrl).clear()
+    val clearResponse = getClearResponse(response)
+    assert(clearResponse.action === Utils.getFormattedClassName(clearResponse))
+    assert(clearResponse.serverSparkVersion === SPARK_VERSION)
+    assert(clearResponse.success)
+  }
+
   /* ---------------------------------------- *
    |     Aberrant client / server behavior    |
    * ---------------------------------------- */
@@ -505,6 +514,15 @@ class StandaloneRestSubmitSuite extends SparkFunSuite {
     }
   }
 
+  /** Return the response as a clear response, or fail with error otherwise. */
+  private def getClearResponse(response: SubmitRestProtocolResponse): 
ClearResponse = {
+    response match {
+      case k: ClearResponse => k
+      case e: ErrorResponse => fail(s"Server returned error: ${e.message}")
+      case r => fail(s"Expected clear response. Actual: ${r.toJson}")
+    }
+  }
+
   /** Return the response as a status response, or fail with error otherwise. 
*/
   private def getStatusResponse(response: SubmitRestProtocolResponse): 
SubmissionStatusResponse = {
     response match {
@@ -574,6 +592,8 @@ private class DummyMaster(
       context.reply(KillDriverResponse(self, driverId, success = true, 
killMessage))
     case RequestDriverStatus(driverId) =>
       context.reply(DriverStatusResponse(found = true, Some(state), None, 
None, exception))
+    case RequestClearCompletedDriversAndApps =>
+      context.reply(true)
   }
 }
 
@@ -617,6 +637,7 @@ private class SmarterMaster(override val rpcEnv: RpcEnv) 
extends ThreadSafeRpcEn
  * When handling a submit request, the server returns a malformed JSON.
  * When handling a kill request, the server returns an invalid JSON.
  * When handling a status request, the server throws an internal exception.
+ * When handling a clear request, the server throws an internal exception.
  * The purpose of this class is to test that client handles these cases 
gracefully.
  */
 private class FaultyStandaloneRestServer(
@@ -630,6 +651,7 @@ private class FaultyStandaloneRestServer(
   protected override val submitRequestServlet = new MalformedSubmitServlet
   protected override val killRequestServlet = new InvalidKillServlet
   protected override val statusRequestServlet = new ExplodingStatusServlet
+  protected override val clearRequestServlet = new ExplodingClearServlet
 
   /** A faulty servlet that produces malformed responses. */
   class MalformedSubmitServlet
@@ -660,4 +682,15 @@ private class FaultyStandaloneRestServer(
       s
     }
   }
+
+  /** A faulty clear servlet that explodes. */
+  class ExplodingClearServlet extends 
StandaloneClearRequestServlet(masterEndpoint, masterConf) {
+    private def explode: Int = 1 / 0
+
+    protected override def handleClear(): ClearResponse = {
+      val s = super.handleClear()
+      s.message = explode.toString
+      s
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to