This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b938195b [MINOR] feat(server): Introduce metrics related 
bitmaps(committedBlockIds/cachedBlockIds/partitionToBlockIds) (#2186)
6b938195b is described below

commit 6b938195b0986afd04f0c882ef031570960d560c
Author: maobaolong <[email protected]>
AuthorDate: Thu Oct 24 11:30:33 2024 +0800

    [MINOR] feat(server): Introduce metrics related 
bitmaps(committedBlockIds/cachedBlockIds/partitionToBlockIds) (#2186)
    
    ### What changes were proposed in this pull request?
    
    Introduce metrics related bitmaps.
    
    ### Why are the changes needed?
    
    Insight the block bitmap related count.
    
    Fix: # (issue)
    
    ### Does this PR introduce _any_ user-facing change?
    
    - committed_block_count
    - reported_block_count
    - cached_block_count
    
    ### How was this patch tested?
    
    Locally.
    
    <img width="2512" alt="image" 
src="https://github.com/user-attachments/assets/c57706db-a13f-44f5-93fd-235825122f5d";>
---
 .../apache/uniffle/server/ShuffleFlushManager.java | 10 ++++++++++
 .../uniffle/server/ShuffleServerMetrics.java       |  4 ++++
 .../apache/uniffle/server/ShuffleTaskManager.java  | 22 ++++++++++++++++++++++
 3 files changed, 36 insertions(+)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index 7d62bb96c..61fdc466a 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -48,6 +48,7 @@ import 
org.apache.uniffle.storage.handler.api.ShuffleWriteHandlerWrapper;
 import org.apache.uniffle.storage.request.CreateShuffleWriteHandlerRequest;
 
 import static 
org.apache.uniffle.server.ShuffleServerConf.SERVER_MAX_CONCURRENCY_OF_ONE_PARTITION;
+import static 
org.apache.uniffle.server.ShuffleServerMetrics.COMMITTED_BLOCK_COUNT;
 
 public class ShuffleFlushManager {
 
@@ -91,6 +92,15 @@ public class ShuffleFlushManager {
             shuffleServerConf, storageManager, shuffleServer, 
this::processFlushEvent);
     isStorageAuditLogEnabled =
         
this.shuffleServerConf.getBoolean(ShuffleServerConf.SERVER_STORAGE_AUDIT_LOG_ENABLED);
+
+    ShuffleServerMetrics.addLabeledCacheGauge(
+        COMMITTED_BLOCK_COUNT,
+        () ->
+            committedBlockIds.values().stream()
+                .flatMap(innerMap -> innerMap.values().stream())
+                .mapToLong(bitmap -> bitmap.getLongCardinality())
+                .sum(),
+        2 * 60 * 1000L /* 2 minutes */);
   }
 
   public void addToFlushQueue(ShuffleDataFlushEvent event) {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index edbc659e2..22d9b7c0a 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -166,6 +166,10 @@ public class ShuffleServerMetrics {
   public static final String BUFFER_COUNT_IN_BUFFER_POOL = 
"buffer_count_in_buffer_pool";
   public static final String SHUFFLE_COUNT_IN_BUFFER_POOL = 
"shuffle_count_in_buffer_pool";
 
+  public static final String COMMITTED_BLOCK_COUNT = "committed_block_count";
+  public static final String REPORTED_BLOCK_COUNT = "reported_block_count";
+  public static final String CACHED_BLOCK_COUNT = "cached_block_count";
+
   public static Counter.Child counterTotalAppNum;
   public static Counter.Child counterTotalAppWithHugePartitionNum;
   public static Counter.Child counterTotalPartitionNum;
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 5478d51c0..dbc94c007 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -89,6 +89,8 @@ import org.apache.uniffle.storage.util.ShuffleStorageUtils;
 
 import static 
org.apache.uniffle.server.ShuffleServerConf.CLIENT_MAX_CONCURRENCY_LIMITATION_OF_ONE_PARTITION;
 import static 
org.apache.uniffle.server.ShuffleServerConf.SERVER_MAX_CONCURRENCY_OF_ONE_PARTITION;
+import static 
org.apache.uniffle.server.ShuffleServerMetrics.CACHED_BLOCK_COUNT;
+import static 
org.apache.uniffle.server.ShuffleServerMetrics.REPORTED_BLOCK_COUNT;
 import static 
org.apache.uniffle.server.ShuffleServerMetrics.REQUIRE_BUFFER_COUNT;
 
 public class ShuffleTaskManager {
@@ -244,6 +246,26 @@ public class ShuffleTaskManager {
     topNShuffleDataSizeOfAppCalcTask.start();
 
     ShuffleServerMetrics.addLabeledGauge(REQUIRE_BUFFER_COUNT, 
requireBufferIds::size);
+    ShuffleServerMetrics.addLabeledCacheGauge(
+        REPORTED_BLOCK_COUNT,
+        () ->
+            partitionsToBlockIds.values().stream()
+                .flatMap(innerMap -> innerMap.values().stream())
+                .flatMapToLong(
+                    arr ->
+                        java.util.Arrays.stream(arr)
+                            
.mapToLong(Roaring64NavigableMap::getLongCardinality))
+                .sum(),
+        2 * 60 * 1000L /* 2 minutes */);
+    ShuffleServerMetrics.addLabeledCacheGauge(
+        CACHED_BLOCK_COUNT,
+        () ->
+            shuffleTaskInfos.values().stream()
+                .map(ShuffleTaskInfo::getCachedBlockIds)
+                .flatMap(map -> map.values().stream())
+                .mapToLong(Roaring64NavigableMap::getLongCardinality)
+                .sum(),
+        2 * 60 * 1000L /* 2 minutes */);
   }
 
   public ReentrantReadWriteLock.WriteLock getAppWriteLock(String appId) {

Reply via email to