This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 6b938195b [MINOR] feat(server): Introduce metrics related
bitmaps(committedBlockIds/cachedBlockIds/partitionToBlockIds) (#2186)
6b938195b is described below
commit 6b938195b0986afd04f0c882ef031570960d560c
Author: maobaolong <[email protected]>
AuthorDate: Thu Oct 24 11:30:33 2024 +0800
[MINOR] feat(server): Introduce metrics related
bitmaps(committedBlockIds/cachedBlockIds/partitionToBlockIds) (#2186)
### What changes were proposed in this pull request?
Introduce metrics related bitmaps.
### Why are the changes needed?
Insight the block bitmap related count.
Fix: # (issue)
### Does this PR introduce _any_ user-facing change?
- committed_block_count
- reported_block_count
- cached_block_count
### How was this patch tested?
Locally.
<img width="2512" alt="image"
src="https://github.com/user-attachments/assets/c57706db-a13f-44f5-93fd-235825122f5d">
---
.../apache/uniffle/server/ShuffleFlushManager.java | 10 ++++++++++
.../uniffle/server/ShuffleServerMetrics.java | 4 ++++
.../apache/uniffle/server/ShuffleTaskManager.java | 22 ++++++++++++++++++++++
3 files changed, 36 insertions(+)
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index 7d62bb96c..61fdc466a 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -48,6 +48,7 @@ import
org.apache.uniffle.storage.handler.api.ShuffleWriteHandlerWrapper;
import org.apache.uniffle.storage.request.CreateShuffleWriteHandlerRequest;
import static
org.apache.uniffle.server.ShuffleServerConf.SERVER_MAX_CONCURRENCY_OF_ONE_PARTITION;
+import static
org.apache.uniffle.server.ShuffleServerMetrics.COMMITTED_BLOCK_COUNT;
public class ShuffleFlushManager {
@@ -91,6 +92,15 @@ public class ShuffleFlushManager {
shuffleServerConf, storageManager, shuffleServer,
this::processFlushEvent);
isStorageAuditLogEnabled =
this.shuffleServerConf.getBoolean(ShuffleServerConf.SERVER_STORAGE_AUDIT_LOG_ENABLED);
+
+ ShuffleServerMetrics.addLabeledCacheGauge(
+ COMMITTED_BLOCK_COUNT,
+ () ->
+ committedBlockIds.values().stream()
+ .flatMap(innerMap -> innerMap.values().stream())
+ .mapToLong(bitmap -> bitmap.getLongCardinality())
+ .sum(),
+ 2 * 60 * 1000L /* 2 minutes */);
}
public void addToFlushQueue(ShuffleDataFlushEvent event) {
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index edbc659e2..22d9b7c0a 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -166,6 +166,10 @@ public class ShuffleServerMetrics {
public static final String BUFFER_COUNT_IN_BUFFER_POOL =
"buffer_count_in_buffer_pool";
public static final String SHUFFLE_COUNT_IN_BUFFER_POOL =
"shuffle_count_in_buffer_pool";
+ public static final String COMMITTED_BLOCK_COUNT = "committed_block_count";
+ public static final String REPORTED_BLOCK_COUNT = "reported_block_count";
+ public static final String CACHED_BLOCK_COUNT = "cached_block_count";
+
public static Counter.Child counterTotalAppNum;
public static Counter.Child counterTotalAppWithHugePartitionNum;
public static Counter.Child counterTotalPartitionNum;
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 5478d51c0..dbc94c007 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -89,6 +89,8 @@ import org.apache.uniffle.storage.util.ShuffleStorageUtils;
import static
org.apache.uniffle.server.ShuffleServerConf.CLIENT_MAX_CONCURRENCY_LIMITATION_OF_ONE_PARTITION;
import static
org.apache.uniffle.server.ShuffleServerConf.SERVER_MAX_CONCURRENCY_OF_ONE_PARTITION;
+import static
org.apache.uniffle.server.ShuffleServerMetrics.CACHED_BLOCK_COUNT;
+import static
org.apache.uniffle.server.ShuffleServerMetrics.REPORTED_BLOCK_COUNT;
import static
org.apache.uniffle.server.ShuffleServerMetrics.REQUIRE_BUFFER_COUNT;
public class ShuffleTaskManager {
@@ -244,6 +246,26 @@ public class ShuffleTaskManager {
topNShuffleDataSizeOfAppCalcTask.start();
ShuffleServerMetrics.addLabeledGauge(REQUIRE_BUFFER_COUNT,
requireBufferIds::size);
+ ShuffleServerMetrics.addLabeledCacheGauge(
+ REPORTED_BLOCK_COUNT,
+ () ->
+ partitionsToBlockIds.values().stream()
+ .flatMap(innerMap -> innerMap.values().stream())
+ .flatMapToLong(
+ arr ->
+ java.util.Arrays.stream(arr)
+
.mapToLong(Roaring64NavigableMap::getLongCardinality))
+ .sum(),
+ 2 * 60 * 1000L /* 2 minutes */);
+ ShuffleServerMetrics.addLabeledCacheGauge(
+ CACHED_BLOCK_COUNT,
+ () ->
+ shuffleTaskInfos.values().stream()
+ .map(ShuffleTaskInfo::getCachedBlockIds)
+ .flatMap(map -> map.values().stream())
+ .mapToLong(Roaring64NavigableMap::getLongCardinality)
+ .sum(),
+ 2 * 60 * 1000L /* 2 minutes */);
}
public ReentrantReadWriteLock.WriteLock getAppWriteLock(String appId) {