This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 29c0f7e50 [CELEBORN-1501] Introduce application dimension resource 
consumption metrics of Worker
29c0f7e50 is described below

commit 29c0f7e50faa8bef28d1d68c8ac87014a3e8f9f1
Author: SteNicholas <[email protected]>
AuthorDate: Fri Sep 6 14:34:23 2024 +0800

    [CELEBORN-1501] Introduce application dimension resource consumption 
metrics of Worker
    
    ### What changes were proposed in this pull request?
    
    Introduce application dimension resource consumption metrics of Worker for 
`ResourceConsumptionSource`.
    
    ### Why are the changes needed?
    
    `ResourceConsumption` namespace metrics are generated for each user and 
they are identified using a metric tag at present. It's recommended to 
introduce application dimension resource consumption metrics that expose 
application dimension resource consumption of Worker. By monitoring resource 
consumption in the application dimension, you can obtain the actual situation 
of application resource consumption.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    ```
    curl http://celeborn-worker:9096/metrics|grep applicationId|grep disk|head 
-20
    
metrics_diskFileCount_Value{applicationId="application_1720756171504_197094_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"}
 42 1721132143020
    
metrics_diskBytesWritten_Value{applicationId="application_1720756171504_197094_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"}
 27157332949 1721132143020
    
metrics_diskFileCount_Value{applicationId="application_1718714878734_1549139_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"}
 47 1721132143020
    
metrics_diskBytesWritten_Value{applicationId="application_1718714878734_1549139_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"}
 483045590821721132143020
    
metrics_diskFileCount_Value{applicationId="application_1688369676084_19713351_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"}
 20 1721132143020
    
metrics_diskBytesWritten_Value{applicationId="application_1688369676084_19713351_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"}
 13112170199 1721132143020
    
metrics_diskFileCount_Value{applicationId="application_1718714878734_1552645_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"}
 45 1721132143020
    
metrics_diskBytesWritten_Value{applicationId="application_1718714878734_1552645_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"}
 353350343061721132143020
    
metrics_diskFileCount_Value{applicationId="application_1718714878734_1552665_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"}
 59 1721132143020
    
metrics_diskBytesWritten_Value{applicationId="application_1718714878734_1552665_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"}
 476373757311721132143020
    
metrics_diskFileCount_Value{applicationId="application_1720756171504_199529_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"}
 59 1721132143020
    
metrics_diskBytesWritten_Value{applicationId="application_1720756171504_199529_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"}
 54106810966 1721132143020
    
metrics_diskFileCount_Value{applicationId="application_1720756171504_199536_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"}
 19 1721132143020
    
metrics_diskBytesWritten_Value{applicationId="application_1720756171504_199536_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"}
 9215818606 1721132143020
    
metrics_diskFileCount_Value{applicationId="application_1650016801129_34416161_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"}
 26 1721132143020
    
metrics_diskBytesWritten_Value{applicationId="application_1650016801129_34416161_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"}
 23650636804 1721132143020
    
metrics_diskFileCount_Value{applicationId="application_1716712852097_2884119_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"}
 12 1721132143020
    
metrics_diskBytesWritten_Value{applicationId="application_1716712852097_2884119_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"}
 650314937 1721132143020
    
metrics_diskFileCount_Value{applicationId="application_1718714878734_1563526_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"}
 16 1721132143020
    
metrics_diskBytesWritten_Value{applicationId="application_1718714878734_1563526_1",hostName="celeborn-worker",name="default",role="Worker",tenantId="default"}
 1555862722 1721132143020
    ```
    <img width="1351" alt="image" 
src="https://github.com/user-attachments/assets/3e007e80-7329-467b-bf74-cfe502b62ae5";>
    <img width="1351" alt="image" 
src="https://github.com/user-attachments/assets/d93ee335-c078-46b8-b682-3b1a04f8a614";>
    <img width="1351" alt="image" 
src="https://github.com/user-attachments/assets/62790378-38aa-480f-b959-6fdbad617808";>
    <img width="1352" alt="image" 
src="https://github.com/user-attachments/assets/b6717316-0b44-4a7b-a55b-4ffa844ded66";>
    
    Closes #2630 from SteNicholas/CELEBORN-1292.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../org/apache/celeborn/common/CelebornConf.scala  | 12 ++++
 docs/configuration/metrics.md                      |  1 +
 .../celeborn/service/deploy/worker/Worker.scala    | 74 +++++++++++++++++-----
 3 files changed, 72 insertions(+), 15 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index a5cd18b3c..47f44a132 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -864,6 +864,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def metricsAppTopDiskUsageCount: Int = get(METRICS_APP_TOP_DISK_USAGE_COUNT)
   def metricsAppTopDiskUsageWindowSize: Int = 
get(METRICS_APP_TOP_DISK_USAGE_WINDOW_SIZE)
   def metricsAppTopDiskUsageInterval: Long = 
get(METRICS_APP_TOP_DISK_USAGE_INTERVAL)
+  def metricsWorkerAppTopResourceConsumptionCount: Int =
+    get(METRICS_WORKER_APP_TOP_RESOURCE_CONSUMPTION_COUNT)
   def metricsWorkerForceAppendPauseSpentTimeThreshold: Int =
     get(METRICS_WORKER_PAUSE_SPENT_TIME_FORCE_APPEND_THRESHOLD)
   def metricsJsonPrettyEnabled: Boolean = get(METRICS_JSON_PRETTY_ENABLED)
@@ -4942,6 +4944,16 @@ object CelebornConf extends Logging {
       .timeConf(TimeUnit.SECONDS)
       .createWithDefaultString("10min")
 
+  val METRICS_WORKER_APP_TOP_RESOURCE_CONSUMPTION_COUNT: ConfigEntry[Int] =
+    buildConf("celeborn.metrics.worker.app.topResourceConsumption.count")
+      .categories("metrics")
+      .doc("Size for top items about top resource consumption applications 
list of worker. " +
+        "The top resource consumption is determined by sum of diskBytesWritten 
and hdfsBytesWritten. " +
+        "The top resource consumption count prevents the total number of 
metrics from exceeding the metrics capacity.")
+      .version("0.6.0")
+      .intConf
+      .createWithDefault(50)
+
   val METRICS_WORKER_PAUSE_SPENT_TIME_FORCE_APPEND_THRESHOLD: ConfigEntry[Int] 
=
     buildConf("celeborn.metrics.worker.pauseSpentTime.forceAppend.threshold")
       .categories("metrics")
diff --git a/docs/configuration/metrics.md b/docs/configuration/metrics.md
index 356a56a27..a4d32421a 100644
--- a/docs/configuration/metrics.md
+++ b/docs/configuration/metrics.md
@@ -32,5 +32,6 @@ license: |
 | celeborn.metrics.prometheus.path | /metrics/prometheus | false | URI context 
path of prometheus metrics HTTP server. | 0.4.0 |  | 
 | celeborn.metrics.sample.rate | 1.0 | false | It controls if Celeborn collect 
timer metrics for some operations. Its value should be in [0.0, 1.0]. | 0.2.0 | 
 | 
 | celeborn.metrics.timer.slidingWindow.size | 4096 | false | The sliding 
window size of timer metric. | 0.2.0 |  | 
+| celeborn.metrics.worker.app.topResourceConsumption.count | 50 | false | Size 
for top items about top resource consumption applications list of worker. The 
top resource consumption is determined by sum of diskBytesWritten and 
hdfsBytesWritten. The top resource consumption count prevents the total number 
of metrics from exceeding the metrics capacity. | 0.6.0 |  | 
 | celeborn.metrics.worker.pauseSpentTime.forceAppend.threshold | 10 | false | 
Force append worker pause spent time even if worker still in pause serving 
state.Help user can find worker pause spent time increase, when worker always 
been pause state. |  |  | 
 <!--end-include-->
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 632701aef..f33c41c3a 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -82,6 +82,10 @@ private[celeborn] class Worker(
   metricsSystem.registerSource(new JVMCPUSource(conf, 
MetricsSystem.ROLE_WORKER))
   metricsSystem.registerSource(new SystemMiscSource(conf, 
MetricsSystem.ROLE_WORKER))
 
+  private val topResourceConsumptionCount = 
conf.metricsWorkerAppTopResourceConsumptionCount
+  private val topApplicationUserIdentifiers =
+    JavaUtils.newConcurrentHashMap[String, UserIdentifier]()
+
   val workerStatusManager = new WorkerStatusManager(conf)
   private val authEnabled = conf.authEnabled
   private val secretRegistry = new 
WorkerSecretRegistryImpl(conf.workerApplicationRegistryCacheSize)
@@ -661,55 +665,75 @@ private[celeborn] class Worker(
     val resourceConsumptionSnapshot = 
storageManager.userResourceConsumptionSnapshot()
     val userResourceConsumptions =
       
workerInfo.updateThenGetUserResourceConsumption(resourceConsumptionSnapshot.asJava)
-    resourceConsumptionSnapshot.foreach { case (userIdentifier, 
userResourceConsumption) =>
+    resourceConsumptionSnapshot.foreach { case (userIdentifier, _) =>
       gaugeResourceConsumption(userIdentifier)
     }
+    handleTopResourceConsumption(userResourceConsumptions)
     userResourceConsumptions
   }
 
+  private def handleTopResourceConsumption(userResourceConsumptions: util.Map[
+    UserIdentifier,
+    ResourceConsumption]): Unit = {
+    // Remove application top resource consumption gauges to refresh top 
resource consumption metrics.
+    
removeAppResourceConsumption(topApplicationUserIdentifiers.keySet().asScala)
+    // Top resource consumption is determined by 
diskBytesWritten+hdfsBytesWritten.
+    userResourceConsumptions.asScala.filter(userResourceConsumption =>
+      
CollectionUtils.isNotEmpty(userResourceConsumption._2.subResourceConsumptions))
+      .flatMap(userResourceConsumption =>
+        
userResourceConsumption._2.subResourceConsumptions.asScala.map(subResourceConsumption
 =>
+          (subResourceConsumption._1, (userResourceConsumption._1, 
subResourceConsumption._2))))
+      .toSeq
+      .sortBy(resourceConsumption =>
+        resourceConsumption._2._2.diskBytesWritten + 
resourceConsumption._2._2.hdfsBytesWritten)
+      .reverse
+      .take(topResourceConsumptionCount).foreach { topResourceConsumption =>
+        val applicationId = topResourceConsumption._1
+        val userIdentifier = topResourceConsumption._2._1
+        topApplicationUserIdentifiers.put(applicationId, userIdentifier)
+        gaugeResourceConsumption(userIdentifier, applicationId, 
topResourceConsumption._2._2)
+      }
+  }
+
   private def gaugeResourceConsumption(
       userIdentifier: UserIdentifier,
-      applicationId: String = null): Unit = {
+      applicationId: String = null,
+      resourceConsumption: ResourceConsumption = null): Unit = {
     var resourceConsumptionLabel = userIdentifier.toMap
     if (applicationId != null)
       resourceConsumptionLabel += (resourceConsumptionSource.applicationLabel 
-> applicationId)
     resourceConsumptionSource.addGauge(
       ResourceConsumptionSource.DISK_FILE_COUNT,
       resourceConsumptionLabel) { () =>
-      computeResourceConsumption(userIdentifier, applicationId).diskFileCount
+      computeResourceConsumption(userIdentifier, 
resourceConsumption).diskFileCount
     }
     resourceConsumptionSource.addGauge(
       ResourceConsumptionSource.DISK_BYTES_WRITTEN,
       resourceConsumptionLabel) { () =>
-      computeResourceConsumption(userIdentifier, 
applicationId).diskBytesWritten
+      computeResourceConsumption(userIdentifier, 
resourceConsumption).diskBytesWritten
     }
     resourceConsumptionSource.addGauge(
       ResourceConsumptionSource.HDFS_FILE_COUNT,
       resourceConsumptionLabel) { () =>
-      computeResourceConsumption(userIdentifier, applicationId).hdfsFileCount
+      computeResourceConsumption(userIdentifier, 
resourceConsumption).hdfsFileCount
     }
     resourceConsumptionSource.addGauge(
       ResourceConsumptionSource.HDFS_BYTES_WRITTEN,
       resourceConsumptionLabel) { () =>
-      computeResourceConsumption(userIdentifier, 
applicationId).hdfsBytesWritten
+      computeResourceConsumption(userIdentifier, 
resourceConsumption).hdfsBytesWritten
     }
   }
 
   private def computeResourceConsumption(
       userIdentifier: UserIdentifier,
-      applicationId: String = null): ResourceConsumption = {
-    var resourceConsumption =
+      resourceConsumption: ResourceConsumption = null): ResourceConsumption = {
+    if (resourceConsumption == null) {
       workerInfo.userResourceConsumption.getOrDefault(
         userIdentifier,
         ResourceConsumption(0, 0, 0, 0))
-    if (applicationId != null) {
-      val subResourceConsumptions = resourceConsumption.subResourceConsumptions
-      if (CollectionUtils.isNotEmpty(subResourceConsumptions)) {
-        resourceConsumption =
-          subResourceConsumptions.getOrDefault(applicationId, 
ResourceConsumption(0, 0, 0, 0))
-      }
+    } else {
+      resourceConsumption
     }
-    resourceConsumption
   }
 
   @VisibleForTesting
@@ -734,6 +758,7 @@ private[celeborn] class Worker(
       fetchHandler.cleanupExpiredShuffleKey(expiredShuffleKeys)
       threadPool.execute(new Runnable {
         override def run(): Unit = {
+          removeAppResourceConsumption(expiredApplicationIds.asScala)
           removeAppActiveConnection(expiredApplicationIds)
           workerSource.sample(
             WorkerSource.CLEAN_EXPIRED_SHUFFLE_KEYS_TIME,
@@ -744,6 +769,25 @@ private[celeborn] class Worker(
       })
     }
 
+  private def removeAppResourceConsumption(applicationIds: Iterable[String]): 
Unit = {
+    applicationIds.foreach { applicationId => 
removeAppResourceConsumption(applicationId) }
+  }
+
+  private def removeAppResourceConsumption(applicationId: String): Unit = {
+    val userIdentifier = topApplicationUserIdentifiers.remove(applicationId)
+    if (userIdentifier != null) {
+      removeAppResourceConsumption(
+        userIdentifier.toMap + (resourceConsumptionSource.applicationLabel -> 
applicationId))
+    }
+  }
+
+  private def removeAppResourceConsumption(resourceConsumptionLabel: 
Map[String, String]): Unit = {
+    workerSource.removeGauge(ResourceConsumptionSource.DISK_FILE_COUNT, 
resourceConsumptionLabel)
+    workerSource.removeGauge(ResourceConsumptionSource.DISK_BYTES_WRITTEN, 
resourceConsumptionLabel)
+    workerSource.removeGauge(ResourceConsumptionSource.HDFS_FILE_COUNT, 
resourceConsumptionLabel)
+    workerSource.removeGauge(ResourceConsumptionSource.HDFS_BYTES_WRITTEN, 
resourceConsumptionLabel)
+  }
+
   private def removeAppActiveConnection(applicationIds: JHashSet[String]): 
Unit = {
     workerSource.removeAppActiveConnection(applicationIds)
   }

Reply via email to