This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new b45b63f9a [CELEBORN-247][FOLLOWUP] Add metrics for each user's quota 
usage of Celeborn Worker
b45b63f9a is described below

commit b45b63f9a56b20c4ca81d5c1cb8429e34a211ef4
Author: SteNicholas <[email protected]>
AuthorDate: Wed Nov 1 15:48:31 2023 +0800

    [CELEBORN-247][FOLLOWUP] Add metrics for each user's quota usage of 
Celeborn Worker
    
    ### What changes were proposed in this pull request?
    
    Add the metric `ResourceConsumption` to monitor each user's quota usage of 
Celeborn Worker.
    
    ### Why are the changes needed?
    
    The metric `ResourceConsumption` supports to monitor each user's quota 
usage of Celeborn Master at present. The usage of Celeborn Worker also needs to 
monitor.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Internal tests.
    
    Closes #2059 from SteNicholas/CELEBORN-247.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 METRICS.md                                         |  8 +--
 .../org/apache/celeborn/common/CelebornConf.scala  |  9 +++
 .../metrics/source/ResourceConsumptionSource.scala | 15 ++++-
 docs/configuration/worker.md                       |  1 +
 docs/monitoring.md                                 |  8 +++
 .../celeborn/service/deploy/master/Master.scala    | 19 +++++--
 .../service/deploy/master/MasterSource.scala       |  9 ++-
 .../celeborn/service/deploy/worker/Worker.scala    | 64 +++++++++++++++++++---
 .../service/deploy/worker/WorkerSource.scala       |  4 +-
 9 files changed, 111 insertions(+), 26 deletions(-)

diff --git a/METRICS.md b/METRICS.md
index e764371da..b4f10cdea 100644
--- a/METRICS.md
+++ b/METRICS.md
@@ -83,10 +83,10 @@ Here is an example of Grafana dashboard importing.
 |             PartitionSize              |      master       |          The 
estimated partition size of last 20 flush window whose length is 15 seconds by 
defaults.           |
 |            PartitionWritten            |      master       |                 
                           The active shuffle size.                             
                |
 |           PartitionFileCount           |      master       |                 
                      The active shuffle partition count.                       
                |
-|             diskFileCount              |      master       |                 
               The count of disk files consumption by each user.                
                |
-|            diskBytesWritten            |      master       |                 
              The amount of disk files consumption by each user.                
                |
-|             hdfsFileCount              |      master       |                 
               The count of hdfs files consumption by each user.                
                |
-|            hdfsBytesWritten            |      master       |                 
              The amount of hdfs files consumption by each user.                
                |
+|             diskFileCount              | master and worker |                 
               The count of disk files consumption by each user.                
                |
+|            diskBytesWritten            | master and worker |                 
              The amount of disk files consumption by each user.                
                |
+|             hdfsFileCount              | master and worker |                 
               The count of hdfs files consumption by each user.                
                |
+|            hdfsBytesWritten            | master and worker |                 
              The amount of hdfs files consumption by each user.                
                |
 |         RegisteredShuffleCount         | master and worker |                 
                 The value means count of registered shuffle.                   
                |
 |            CommitFilesTime             |      worker       |                 
          CommitFiles means flush and close a shuffle partition file.           
                |
 |            ReserveSlotsTime            |      worker       |                 
    ReserveSlots means acquire a disk buffer and record partition location.     
                |
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 3c717fa5a..52e2a414b 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -521,6 +521,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def estimatedPartitionSizeForEstimationUpdateInterval: Long =
     get(ESTIMATED_PARTITION_SIZE_UPDATE_INTERVAL)
   def masterResourceConsumptionInterval: Long = 
get(MASTER_RESOURCE_CONSUMPTION_INTERVAL)
+  def workerResourceConsumptionInterval: Long = 
get(WORKER_RESOURCE_CONSUMPTION_INTERVAL)
 
   // //////////////////////////////////////////////////////
   //               Address && HA && RATIS                //
@@ -2018,6 +2019,14 @@ object CelebornConf extends Logging {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("30s")
 
+  val WORKER_RESOURCE_CONSUMPTION_INTERVAL: ConfigEntry[Long] =
+    buildConf("celeborn.worker.userResourceConsumption.update.interval")
+      .categories("worker")
+      .doc("Time length for a window about compute user resource consumption.")
+      .version("0.3.2")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("30s")
+
   val SHUFFLE_CHUNK_SIZE: ConfigEntry[Long] =
     buildConf("celeborn.shuffle.chunk.size")
       .categories("worker")
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/ResourceConsumptionSource.scala
 
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/ResourceConsumptionSource.scala
index f705f121d..df33310bb 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/ResourceConsumptionSource.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/ResourceConsumptionSource.scala
@@ -19,9 +19,18 @@ package org.apache.celeborn.common.metrics.source
 
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.internal.Logging
-import org.apache.celeborn.common.metrics.MetricsSystem
 
-class ResourceConsumptionSource(conf: CelebornConf)
-  extends AbstractSource(conf, MetricsSystem.ROLE_MASTER) with Logging {
+class ResourceConsumptionSource(conf: CelebornConf, role: String)
+  extends AbstractSource(conf, role) with Logging {
   override val sourceName = "ResourceConsumption"
 }
+
+object ResourceConsumptionSource {
+  val DISK_FILE_COUNT = "diskFileCount"
+
+  val DISK_BYTES_WRITTEN = "diskBytesWritten"
+
+  val HDFS_FILE_COUNT = "hdfsFileCount"
+
+  val HDFS_BYTES_WRITTEN = "hdfsBytesWritten"
+}
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 8154101b0..984be9f4f 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -105,6 +105,7 @@ license: |
 | celeborn.worker.storage.disk.reserve.size | 5G | Celeborn worker reserved 
space for each disk. | 0.3.0 | 
 | celeborn.worker.storage.expireDirs.timeout | 1h | The timeout for a expire 
dirs to be deleted on disk. | 0.3.2 | 
 | celeborn.worker.storage.workingDir | celeborn-worker/shuffle_data | Worker's 
working dir path name. | 0.3.0 | 
+| celeborn.worker.userResourceConsumption.update.interval | 30s | Time length 
for a window about compute user resource consumption. | 0.3.2 | 
 | celeborn.worker.writer.close.timeout | 120s | Timeout for a file writer to 
close | 0.2.0 | 
 | celeborn.worker.writer.create.maxAttempts | 3 | Retry count for a file 
writer to create if its creation was failed. | 0.2.0 | 
 <!--end-include-->
diff --git a/docs/monitoring.md b/docs/monitoring.md
index e5745ee56..1c82c1c1e 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -266,6 +266,14 @@ These metrics are exposed by Celeborn worker.
       [Dropwizard/Codahale Metric Sets for JVM 
instrumentation](https://metrics.dropwizard.io/4.2.0/manual/jvm.html)
       and in particular the metric sets BufferPoolMetricSet, 
GarbageCollectorMetricSet and MemoryUsageGaugeSet.
 
+  - namespace=ResourceConsumption
+    - **notes:**
+        - This metrics data is generated for each user and they are identified 
using a metric tag.
+    - diskFileCount
+    - diskBytesWritten
+    - hdfsFileCount
+    - hdfsBytesWritten
+
 **Note:**
 
 The Netty DirectArenaMetrics named like `push/fetch/replicate_server_numXX` 
are not exposed by default, nor in Grafana dashboard.
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 1e9c813e6..ead05b375 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -152,7 +152,8 @@ private[celeborn] class Master(
   private val slotsAssignPolicy = conf.masterSlotAssignPolicy
 
   // init and register master metrics
-  val resourceConsumptionSource = new ResourceConsumptionSource(conf)
+  private val resourceConsumptionSource =
+    new ResourceConsumptionSource(conf, MetricsSystem.ROLE_MASTER)
   private val masterSource = new MasterSource(conf)
   private var hadoopFs: FileSystem = _
   masterSource.addGauge(MasterSource.REGISTERED_SHUFFLE_COUNT) { () =>
@@ -840,16 +841,24 @@ private[celeborn] class Master(
       userIdentifier: UserIdentifier,
       context: RpcCallContext): Unit = {
 
-    resourceConsumptionSource.addGauge("diskFileCount", userIdentifier.toMap) 
{ () =>
+    resourceConsumptionSource.addGauge(
+      ResourceConsumptionSource.DISK_FILE_COUNT,
+      userIdentifier.toMap) { () =>
       computeUserResourceConsumption(userIdentifier).diskFileCount
     }
-    resourceConsumptionSource.addGauge("diskBytesWritten", 
userIdentifier.toMap) { () =>
+    resourceConsumptionSource.addGauge(
+      ResourceConsumptionSource.DISK_BYTES_WRITTEN,
+      userIdentifier.toMap) { () =>
       computeUserResourceConsumption(userIdentifier).diskBytesWritten
     }
-    resourceConsumptionSource.addGauge("hdfsFileCount", userIdentifier.toMap) 
{ () =>
+    resourceConsumptionSource.addGauge(
+      ResourceConsumptionSource.HDFS_FILE_COUNT,
+      userIdentifier.toMap) { () =>
       computeUserResourceConsumption(userIdentifier).hdfsFileCount
     }
-    resourceConsumptionSource.addGauge("hdfsBytesWritten", 
userIdentifier.toMap) { () =>
+    resourceConsumptionSource.addGauge(
+      ResourceConsumptionSource.HDFS_BYTES_WRITTEN,
+      userIdentifier.toMap) { () =>
       computeUserResourceConsumption(userIdentifier).hdfsBytesWritten
     }
 
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
index 05c14b7df..8010f0a74 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
@@ -18,15 +18,14 @@
 package org.apache.celeborn.service.deploy.master
 
 import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.metrics.MetricsSystem
 import org.apache.celeborn.common.metrics.source.AbstractSource
-import org.apache.celeborn.service.deploy.master.MasterSource.OFFER_SLOTS_TIME
 
-class MasterSource(conf: CelebornConf)
-  extends AbstractSource(conf, MetricsSystem.ROLE_MASTER) with Logging {
-  override val sourceName = s"master"
+class MasterSource(conf: CelebornConf) extends AbstractSource(conf, 
MetricsSystem.ROLE_MASTER) {
+  override val sourceName = "master"
 
+  import MasterSource._
+  // add timers
   addTimer(OFFER_SLOTS_TIME)
   // start cleaner
   startCleaner()
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 6c0f981ce..b3bef5688 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -19,6 +19,7 @@ package org.apache.celeborn.service.deploy.worker
 
 import java.io.File
 import java.lang.{Long => JLong}
+import java.util
 import java.util.{HashMap => JHashMap, HashSet => JHashSet, Map => JMap}
 import java.util.concurrent._
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicIntegerArray}
@@ -36,7 +37,7 @@ import org.apache.celeborn.common.identity.UserIdentifier
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.meta.{DiskInfo, WorkerInfo, 
WorkerPartitionLocationInfo}
 import org.apache.celeborn.common.metrics.MetricsSystem
-import org.apache.celeborn.common.metrics.source.{JVMCPUSource, JVMSource, 
SystemMiscSource}
+import org.apache.celeborn.common.metrics.source.{JVMCPUSource, JVMSource, 
ResourceConsumptionSource, SystemMiscSource}
 import org.apache.celeborn.common.network.TransportContext
 import org.apache.celeborn.common.protocol.{PartitionType, 
PbRegisterWorkerResponse, PbWorkerLostResponse, RpcNameConstants, 
TransportModuleConstants}
 import org.apache.celeborn.common.protocol.message.ControlMessages._
@@ -104,7 +105,10 @@ private[celeborn] class Worker(
     }
   }
 
+  private val resourceConsumptionSource =
+    new ResourceConsumptionSource(conf, MetricsSystem.ROLE_WORKER)
   val workerSource = new WorkerSource(conf)
+  metricsSystem.registerSource(resourceConsumptionSource)
   metricsSystem.registerSource(workerSource)
   metricsSystem.registerSource(new JVMSource(conf, MetricsSystem.ROLE_WORKER))
   metricsSystem.registerSource(new JVMCPUSource(conf, 
MetricsSystem.ROLE_WORKER))
@@ -263,6 +267,10 @@ private[celeborn] class Worker(
   private val cleanTaskQueue = new LinkedBlockingQueue[JHashSet[String]]
   var cleaner: Thread = _
 
+  private val workerResourceConsumptionInterval = 
conf.workerResourceConsumptionInterval
+  private val userResourceConsumptions =
+    JavaUtils.newConcurrentHashMap[UserIdentifier, (ResourceConsumption, 
Long)]()
+
   workerSource.addGauge(WorkerSource.REGISTERED_SHUFFLE_COUNT) { () =>
     workerInfo.getShuffleKeySet.size
   }
@@ -335,9 +343,6 @@ private[celeborn] class Worker(
       workerInfo.updateThenGetDiskInfos(storageManager.disksSnapshot().map { 
disk =>
         disk.mountPoint -> disk
       }.toMap.asJava).values().asScala.toSeq
-    val resourceConsumption = workerInfo.updateThenGetUserResourceConsumption(
-      storageManager.userResourceConsumptionSnapshot().asJava)
-
     val response = masterClient.askSync[HeartbeatFromWorkerResponse](
       HeartbeatFromWorker(
         host,
@@ -346,7 +351,7 @@ private[celeborn] class Worker(
         fetchPort,
         replicatePort,
         diskInfos,
-        resourceConsumption,
+        handleResourceConsumption(),
         activeShuffleKeys,
         estimatedAppDiskUsage,
         highWorkload),
@@ -486,8 +491,7 @@ private[celeborn] class Worker(
               // Use WorkerInfo's diskInfo since re-register when heartbeat 
return not-registered,
               // StorageManager have update the disk info.
               workerInfo.diskInfos.asScala.toMap,
-              workerInfo.updateThenGetUserResourceConsumption(
-                
storageManager.userResourceConsumptionSnapshot().asJava).asScala.toMap,
+              handleResourceConsumption().asScala.toMap,
               MasterClient.genRequestId()),
             classOf[PbRegisterWorkerResponse])
         } catch {
@@ -511,6 +515,52 @@ private[celeborn] class Worker(
     // If worker register still failed after retry, throw exception to stop 
worker process
     throw new CelebornException("Register worker failed.", exception)
   }
+
+  private def handleResourceConsumption(): util.Map[UserIdentifier, 
ResourceConsumption] = {
+    val resourceConsumptionSnapshot = 
storageManager.userResourceConsumptionSnapshot()
+    resourceConsumptionSnapshot.foreach { resourceConsumption =>
+      {
+        resourceConsumptionSource.addGauge(
+          ResourceConsumptionSource.DISK_FILE_COUNT,
+          resourceConsumption._1.toMap) { () =>
+          computeUserResourceConsumption(resourceConsumption).diskFileCount
+        }
+        resourceConsumptionSource.addGauge(
+          ResourceConsumptionSource.DISK_BYTES_WRITTEN,
+          resourceConsumption._1.toMap) { () =>
+          computeUserResourceConsumption(resourceConsumption).diskBytesWritten
+        }
+        resourceConsumptionSource.addGauge(
+          ResourceConsumptionSource.HDFS_FILE_COUNT,
+          resourceConsumption._1.toMap) { () =>
+          computeUserResourceConsumption(resourceConsumption).hdfsFileCount
+        }
+        resourceConsumptionSource.addGauge(
+          ResourceConsumptionSource.HDFS_BYTES_WRITTEN,
+          resourceConsumption._1.toMap) { () =>
+          computeUserResourceConsumption(resourceConsumption).hdfsBytesWritten
+        }
+      }
+    }
+    
workerInfo.updateThenGetUserResourceConsumption(resourceConsumptionSnapshot.asJava)
+  }
+
+  private def computeUserResourceConsumption(userResourceConsumption: (
+      UserIdentifier,
+      ResourceConsumption)): ResourceConsumption = {
+    val userIdentifier = userResourceConsumption._1
+    val resourceConsumption = userResourceConsumption._2
+    val current = System.currentTimeMillis()
+    if (userResourceConsumptions.containsKey(userIdentifier)) {
+      val resourceConsumptionAndUpdateTime = 
userResourceConsumptions.get(userIdentifier)
+      if (current - resourceConsumptionAndUpdateTime._2 <= 
workerResourceConsumptionInterval) {
+        return resourceConsumptionAndUpdateTime._1
+      }
+    }
+    userResourceConsumptions.put(userIdentifier, (resourceConsumption, 
current))
+    resourceConsumption
+  }
+
   @VisibleForTesting
   def cleanup(expiredShuffleKeys: JHashSet[String]): Unit = synchronized {
     expiredShuffleKeys.asScala.foreach { shuffleKey =>
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
index df07970ab..f46b0ff90 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
@@ -38,7 +38,7 @@ class WorkerSource(conf: CelebornConf) extends 
AbstractSource(conf, MetricsSyste
   addCounter(REGION_FINISH_FAIL_COUNT)
   addCounter(ACTIVE_CONNECTION_COUNT)
 
-  // add Timers
+  // add timers
   addTimer(COMMIT_FILES_TIME)
   addTimer(RESERVE_SLOTS_TIME)
   addTimer(FLUSH_DATA_TIME)
@@ -130,7 +130,7 @@ object WorkerSource {
   val USER_PRODUCE_SPEED = "UserProduceSpeed"
   val WORKER_CONSUME_SPEED = "WorkerConsumeSpeed"
 
-  // active shuffle size
+  // active shuffle
   val ACTIVE_SHUFFLE_SIZE = "ActiveShuffleSize"
   val ACTIVE_SHUFFLE_FILE_COUNT = "ActiveShuffleFileCount"
 }

Reply via email to