This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f75c7a7b5240 [SPARK-46883][CORE] Support `/json/clusterutilization` API
f75c7a7b5240 is described below

commit f75c7a7b52402e4c8faa39b2f88623e9f0bca916
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Sat Jan 27 09:21:17 2024 -0800

    [SPARK-46883][CORE] Support `/json/clusterutilization` API
    
    ### What changes were proposed in this pull request?
    
    This PR aims to support new `/json/clusterutilization` API in `Master` JSON 
endpoint
    
    ### Why are the changes needed?
    
    The user can get CPU/Memory/Waiting apps in a single API call.
    ```
    # Start Spark Cluster and Spark Shell
    $ sbin/start-master.sh
    $ sbin/start-worker.sh spark://$(hostname):7077;
    $ bin/spark-shell --master spark://$(hostname):7077
    
    # Check `Cluster Utilization API`
    $ curl http://localhost:8080/json/clusterutilization
    {
      "waitingDrivers" : 0,
      "cores" : 10,
      "coresused" : 10,
      "coresutilization" : 100,
      "memory" : 31744,
      "memoryused" : 1024,
      "memoryutilization" : 3
    }
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. This is a newly added API.
    
    ### How was this patch tested?
    
    Pass the CIs with the newly added test case.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #44908 from dongjoon-hyun/SPARK-46883.
    
    Authored-by: Dongjoon Hyun <dh...@apple.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../org/apache/spark/deploy/JsonProtocol.scala      | 18 ++++++++++++++++++
 .../apache/spark/deploy/master/ui/MasterPage.scala  |  2 ++
 .../org/apache/spark/deploy/JsonProtocolSuite.scala | 21 +++++++++++++++++++++
 3 files changed, 41 insertions(+)

diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
index 8c356081b277..9c73e84f4166 100644
--- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
@@ -299,4 +299,22 @@ private[deploy] object JsonProtocol {
     ("executors" -> obj.executors.map(writeExecutorRunner)) ~
     ("finishedexecutors" -> obj.finishedExecutors.map(writeExecutorRunner))
   }
+
+  /**
+   * Export the cluster utilization based on the [[MasterStateResponse]] to a 
Json object.
+   */
+  def writeClusterUtilization(obj: MasterStateResponse): JObject = {
+    val aliveWorkers = obj.workers.filter(_.isAlive())
+    val cores = aliveWorkers.map(_.cores).sum
+    val coresUsed = aliveWorkers.map(_.coresUsed).sum
+    val memory = aliveWorkers.map(_.memory).sum
+    val memoryUsed = aliveWorkers.map(_.memoryUsed).sum
+    ("waitingDrivers" -> obj.activeDrivers.count(_.state == 
DriverState.SUBMITTED)) ~
+    ("cores" -> cores) ~
+    ("coresused" -> coresUsed) ~
+    ("coresutilization" -> 100 * coresUsed / cores) ~
+    ("memory" -> memory) ~
+    ("memoryused" -> memoryUsed) ~
+    ("memoryutilization" -> 100 * memoryUsed / memory)
+  }
 }
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
index 36a79e060f01..cbeda23013ac 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
@@ -41,6 +41,8 @@ private[ui] class MasterPage(parent: MasterWebUI) extends 
WebUIPage("") {
   override def renderJson(request: HttpServletRequest): JValue = {
     jsonFieldPattern.findFirstMatchIn(request.getRequestURI()) match {
       case None => JsonProtocol.writeMasterState(getMasterState)
+      case Some(m) if m.group(1) == "clusterutilization" =>
+        JsonProtocol.writeClusterUtilization(getMasterState)
       case Some(m) => JsonProtocol.writeMasterState(getMasterState, 
Some(m.group(1)))
     }
   }
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index 4a6ace6facde..6fca31234ee2 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -105,6 +105,20 @@ class JsonProtocolSuite extends SparkFunSuite with 
JsonTestUtils {
     assertValidDataInJson(output, 
JsonMethods.parse(JsonConstants.workerStateJsonStr))
   }
 
+  test("SPARK-46883: writeClusterUtilization") {
+    val workers = Array(createWorkerInfo(), createWorkerInfo())
+    val activeApps = Array(createAppInfo())
+    val completedApps = Array.empty[ApplicationInfo]
+    val activeDrivers = Array(createDriverInfo())
+    val completedDrivers = Array(createDriverInfo())
+    val stateResponse = new MasterStateResponse(
+      "host", 8080, None, workers, activeApps, completedApps,
+      activeDrivers, completedDrivers, RecoveryState.ALIVE)
+    val output = JsonProtocol.writeClusterUtilization(stateResponse)
+    assertValidJson(output)
+    assertValidDataInJson(output, 
JsonMethods.parse(JsonConstants.clusterUtilizationJsonStr))
+  }
+
   def assertValidJson(json: JValue): Unit = {
     try {
       JsonMethods.parse(JsonMethods.compact(json))
@@ -206,4 +220,11 @@ object JsonConstants {
       |"executors":[],
       |"finishedexecutors":[%s,%s]}
     """.format(executorRunnerJsonStr, executorRunnerJsonStr).stripMargin
+
+  val clusterUtilizationJsonStr =
+    """
+      |{"waitingDrivers":1,
+      |"cores":8,"coresused":0,"coresutilization":0,
+      |"memory":2468,"memoryused":0,"memoryutilization":0}
+    """.stripMargin
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to