This is an automated email from the ASF dual-hosted git repository.

xianjin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new c59e2a4b [#799] feat: use storage host label for remote storage write 
metrics (#800)
c59e2a4b is described below

commit c59e2a4b10284534b47f5a52373cbe4e62ac6b90
Author: advancedxy <[email protected]>
AuthorDate: Fri Apr 7 10:32:33 2023 +0800

    [#799] feat: use storage host label for remote storage write metrics (#800)
    
    ### What changes were proposed in this pull request?
    1. replace dynamic counter with counter with label
    
    ### Why are the changes needed?
    Fix: #799
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Existing UTs.
---
 .../uniffle/server/ShuffleServerMetrics.java       | 70 ++++++----------------
 .../uniffle/server/storage/HdfsStorageManager.java |  2 -
 .../uniffle/server/ShuffleFlushManagerTest.java    | 12 ++--
 .../uniffle/server/ShuffleServerMetricsTest.java   | 37 ++++++------
 4 files changed, 44 insertions(+), 77 deletions(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index 098560d1..a5b09ff1 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -17,8 +17,6 @@
 
 package org.apache.uniffle.server;
 
-import java.util.Map;
-
 import com.google.common.annotations.VisibleForTesting;
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.Counter;
@@ -26,8 +24,6 @@ import io.prometheus.client.Gauge;
 import org.apache.commons.lang3.StringUtils;
 
 import org.apache.uniffle.common.metrics.MetricsManager;
-import org.apache.uniffle.common.util.JavaUtils;
-import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.storage.common.LocalStorage;
 
 public class ShuffleServerMetrics {
@@ -80,10 +76,11 @@ public class ShuffleServerMetrics {
   private static final String STORAGE_RETRY_WRITE_LOCAL = 
"storage_retry_write_local";
   private static final String STORAGE_FAILED_WRITE_LOCAL = 
"storage_failed_write_local";
   private static final String STORAGE_SUCCESS_WRITE_LOCAL = 
"storage_success_write_local";
-  public static final String STORAGE_TOTAL_WRITE_REMOTE_PREFIX = 
"storage_total_write_remote_";
-  public static final String STORAGE_RETRY_WRITE_REMOTE_PREFIX = 
"storage_retry_write_remote_";
-  public static final String STORAGE_FAILED_WRITE_REMOTE_PREFIX = 
"storage_failed_write_remote_";
-  public static final String STORAGE_SUCCESS_WRITE_REMOTE_PREFIX = 
"storage_success_write_remote_";
+  private static final String STORAGE_HOST_LABEL = "storage_host";
+  public static final String STORAGE_TOTAL_WRITE_REMOTE = 
"storage_total_write_remote";
+  public static final String STORAGE_RETRY_WRITE_REMOTE = 
"storage_retry_write_remote";
+  public static final String STORAGE_FAILED_WRITE_REMOTE = 
"storage_failed_write_remote";
+  public static final String STORAGE_SUCCESS_WRITE_REMOTE = 
"storage_success_write_remote";
 
   private static final String TOTAL_APP_NUM = "total_app_num";
   private static final String TOTAL_APP_WITH_HUGE_PARTITION_NUM = 
"total_app_with_huge_partition_num";
@@ -148,20 +145,16 @@ public class ShuffleServerMetrics {
   public static Gauge gaugeEventQueueSize;
   public static Gauge gaugeAppNum;
   public static Gauge gaugeTotalPartitionNum;
-  public static Map<String, Counter> counterRemoteStorageTotalWrite;
-  public static Map<String, Counter> counterRemoteStorageRetryWrite;
-  public static Map<String, Counter> counterRemoteStorageFailedWrite;
-  public static Map<String, Counter> counterRemoteStorageSuccessWrite;
+  public static Counter counterRemoteStorageTotalWrite;
+  public static Counter counterRemoteStorageRetryWrite;
+  public static Counter counterRemoteStorageFailedWrite;
+  public static Counter counterRemoteStorageSuccessWrite;
 
   private static MetricsManager metricsManager;
   private static boolean isRegister = false;
 
   public static synchronized void register(CollectorRegistry 
collectorRegistry) {
     if (!isRegister) {
-      counterRemoteStorageTotalWrite = JavaUtils.newConcurrentMap();
-      counterRemoteStorageRetryWrite = JavaUtils.newConcurrentMap();
-      counterRemoteStorageFailedWrite = JavaUtils.newConcurrentMap();
-      counterRemoteStorageSuccessWrite = JavaUtils.newConcurrentMap();
       metricsManager = new MetricsManager(collectorRegistry);
       isRegister = true;
       setUpMetrics();
@@ -183,43 +176,14 @@ public class ShuffleServerMetrics {
     return metricsManager.getCollectorRegistry();
   }
 
-  public static synchronized void addDynamicCounterForRemoteStorage(String 
storageHost) {
-    if (!StringUtils.isEmpty(storageHost)) {
-      String totalWriteMetricName = STORAGE_TOTAL_WRITE_REMOTE_PREFIX
-          + RssUtils.getMetricNameForHostName(storageHost);
-      if (!counterRemoteStorageTotalWrite.containsKey(storageHost)) {
-        counterRemoteStorageTotalWrite.putIfAbsent(storageHost,
-            metricsManager.addCounter(totalWriteMetricName));
-      }
-      String retryWriteMetricName = STORAGE_RETRY_WRITE_REMOTE_PREFIX
-          + RssUtils.getMetricNameForHostName(storageHost);
-      if (!counterRemoteStorageRetryWrite.containsKey(storageHost)) {
-        counterRemoteStorageRetryWrite.putIfAbsent(storageHost,
-            metricsManager.addCounter(retryWriteMetricName));
-      }
-      String failedWriteMetricName = STORAGE_FAILED_WRITE_REMOTE_PREFIX
-          + RssUtils.getMetricNameForHostName(storageHost);
-      if (!counterRemoteStorageFailedWrite.containsKey(storageHost)) {
-        counterRemoteStorageFailedWrite.putIfAbsent(storageHost,
-            metricsManager.addCounter(failedWriteMetricName));
-      }
-      String successWriteMetricName = STORAGE_SUCCESS_WRITE_REMOTE_PREFIX
-          + RssUtils.getMetricNameForHostName(storageHost);
-      if (!counterRemoteStorageSuccessWrite.containsKey(storageHost)) {
-        counterRemoteStorageSuccessWrite.putIfAbsent(storageHost,
-            metricsManager.addCounter(successWriteMetricName));
-      }
-    }
-  }
-
   public static void incStorageRetryCounter(String storageHost) {
     if (LocalStorage.STORAGE_HOST.equals(storageHost)) {
       counterLocalStorageTotalWrite.inc();
       counterLocalStorageRetryWrite.inc();
     } else {
       if (!StringUtils.isEmpty(storageHost)) {
-        counterRemoteStorageTotalWrite.get(storageHost).inc();
-        counterRemoteStorageRetryWrite.get(storageHost).inc();
+        counterRemoteStorageTotalWrite.labels(storageHost).inc();
+        counterRemoteStorageRetryWrite.labels(storageHost).inc();
       }
     }
   }
@@ -230,8 +194,8 @@ public class ShuffleServerMetrics {
       counterLocalStorageSuccessWrite.inc();
     } else {
       if (!StringUtils.isEmpty(storageHost)) {
-        counterRemoteStorageTotalWrite.get(storageHost).inc();
-        counterRemoteStorageSuccessWrite.get(storageHost).inc();
+        counterRemoteStorageTotalWrite.labels(storageHost).inc();
+        counterRemoteStorageSuccessWrite.labels(storageHost).inc();
       }
     }
   }
@@ -242,8 +206,8 @@ public class ShuffleServerMetrics {
       counterLocalStorageFailedWrite.inc();
     } else {
       if (!StringUtils.isEmpty(storageHost)) {
-        counterRemoteStorageTotalWrite.get(storageHost).inc();
-        counterRemoteStorageFailedWrite.get(storageHost).inc();
+        counterRemoteStorageTotalWrite.labels(storageHost).inc();
+        counterRemoteStorageFailedWrite.labels(storageHost).inc();
       }
     }
   }
@@ -278,6 +242,10 @@ public class ShuffleServerMetrics {
     counterLocalStorageRetryWrite = 
metricsManager.addCounter(STORAGE_RETRY_WRITE_LOCAL);
     counterLocalStorageFailedWrite = 
metricsManager.addCounter(STORAGE_FAILED_WRITE_LOCAL);
     counterLocalStorageSuccessWrite = 
metricsManager.addCounter(STORAGE_SUCCESS_WRITE_LOCAL);
+    counterRemoteStorageTotalWrite = 
metricsManager.addCounter(STORAGE_TOTAL_WRITE_REMOTE, STORAGE_HOST_LABEL);
+    counterRemoteStorageRetryWrite = 
metricsManager.addCounter(STORAGE_RETRY_WRITE_REMOTE, STORAGE_HOST_LABEL);
+    counterRemoteStorageFailedWrite = 
metricsManager.addCounter(STORAGE_FAILED_WRITE_REMOTE, STORAGE_HOST_LABEL);
+    counterRemoteStorageSuccessWrite = 
metricsManager.addCounter(STORAGE_SUCCESS_WRITE_REMOTE, STORAGE_HOST_LABEL);
     counterTotalRequireReadMemoryNum = 
metricsManager.addCounter(TOTAL_REQUIRE_READ_MEMORY);
     counterTotalRequireReadMemoryRetryNum = 
metricsManager.addCounter(TOTAL_REQUIRE_READ_MEMORY_RETRY);
     counterTotalRequireReadMemoryFailedNum = 
metricsManager.addCounter(TOTAL_REQUIRE_READ_MEMORY_FAILED);
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
index 7fddb86a..2cebe0ca 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
@@ -131,8 +131,6 @@ public class HdfsStorageManager extends 
SingleStorageManager {
         }
       }
       HdfsStorage hdfsStorage = new HdfsStorage(remoteStorage, 
remoteStorageHadoopConf);
-      String storageHost = hdfsStorage.getStorageHost();
-      ShuffleServerMetrics.addDynamicCounterForRemoteStorage(storageHost);
       return hdfsStorage;
     });
     appIdToStorages.computeIfAbsent(appId, key -> 
pathToStorages.get(remoteStorage));
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index aa8f88b6..54f4a0b5 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -186,10 +186,10 @@ public class ShuffleFlushManagerTest extends HdfsTestBase 
{
         
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
     storageManager.registerRemoteStorage(appId, remoteStorage);
     String storageHost = cluster.getURI().getHost();
-    assertEquals(0.0, 
ShuffleServerMetrics.counterRemoteStorageTotalWrite.get(storageHost).get(), 
0.5);
-    assertEquals(0.0, 
ShuffleServerMetrics.counterRemoteStorageRetryWrite.get(storageHost).get(), 
0.5);
-    assertEquals(0.0, 
ShuffleServerMetrics.counterRemoteStorageFailedWrite.get(storageHost).get(), 
0.5);
-    assertEquals(0.0, 
ShuffleServerMetrics.counterRemoteStorageSuccessWrite.get(storageHost).get(), 
0.5);
+    assertEquals(0.0, 
ShuffleServerMetrics.counterRemoteStorageTotalWrite.labels(storageHost).get(), 
0.5);
+    assertEquals(0.0, 
ShuffleServerMetrics.counterRemoteStorageRetryWrite.labels(storageHost).get(), 
0.5);
+    assertEquals(0.0, 
ShuffleServerMetrics.counterRemoteStorageFailedWrite.labels(storageHost).get(), 
0.5);
+    assertEquals(0.0, 
ShuffleServerMetrics.counterRemoteStorageSuccessWrite.labels(storageHost).get(),
 0.5);
     ShuffleFlushManager manager =
         new ShuffleFlushManager(shuffleServerConf, mockShuffleServer, 
storageManager);
     ShuffleDataFlushEvent event1 =
@@ -214,8 +214,8 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
     validate(appId, 2, 2, blocks21, 1, remoteStorage.getPath());
     assertEquals(blocks21.size(), manager.getCommittedBlockIds(appId, 
2).getLongCardinality());
 
-    assertEquals(3.0, 
ShuffleServerMetrics.counterRemoteStorageTotalWrite.get(storageHost).get(), 
0.5);
-    assertEquals(3.0, 
ShuffleServerMetrics.counterRemoteStorageSuccessWrite.get(storageHost).get(), 
0.5);
+    assertEquals(3.0, 
ShuffleServerMetrics.counterRemoteStorageTotalWrite.labels(storageHost).get(), 
0.5);
+    assertEquals(3.0, 
ShuffleServerMetrics.counterRemoteStorageSuccessWrite.labels(storageHost).get(),
 0.5);
 
     // test case for process event whose related app was cleared already
     assertEquals(0, ShuffleServerMetrics.gaugeWriteHandler.get(), 0.5);
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
index 99993eb4..93f48f8a 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
@@ -37,6 +37,7 @@ import org.apache.uniffle.common.metrics.TestUtils;
 import org.apache.uniffle.storage.common.LocalStorage;
 import org.apache.uniffle.storage.util.StorageType;
 
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -85,32 +86,32 @@ public class ShuffleServerMetricsTest {
 
   @Test
   public void testServerMetrics() throws Exception {
+    
ShuffleServerMetrics.counterRemoteStorageFailedWrite.labels(STORAGE_HOST).inc(0);
+    
ShuffleServerMetrics.counterRemoteStorageSuccessWrite.labels(STORAGE_HOST).inc(0);
+    
ShuffleServerMetrics.counterRemoteStorageTotalWrite.labels(STORAGE_HOST).inc(0);
+    
ShuffleServerMetrics.counterRemoteStorageRetryWrite.labels(STORAGE_HOST).inc(0);
     String content = TestUtils.httpGet(SERVER_METRICS_URL);
     ObjectMapper mapper = new ObjectMapper();
     JsonNode actualObj = mapper.readTree(content);
     assertEquals(2, actualObj.size());
     JsonNode metricsNode = actualObj.get("metrics");
-
     List<String> expectedMetricNames = Lists.newArrayList(
-        ShuffleServerMetrics.STORAGE_TOTAL_WRITE_REMOTE_PREFIX + STORAGE_HOST,
-        ShuffleServerMetrics.STORAGE_SUCCESS_WRITE_REMOTE_PREFIX + 
STORAGE_HOST,
-        ShuffleServerMetrics.STORAGE_FAILED_WRITE_REMOTE_PREFIX + STORAGE_HOST,
-        ShuffleServerMetrics.STORAGE_RETRY_WRITE_REMOTE_PREFIX + STORAGE_HOST);
+        ShuffleServerMetrics.STORAGE_TOTAL_WRITE_REMOTE,
+        ShuffleServerMetrics.STORAGE_SUCCESS_WRITE_REMOTE,
+        ShuffleServerMetrics.STORAGE_FAILED_WRITE_REMOTE,
+        ShuffleServerMetrics.STORAGE_RETRY_WRITE_REMOTE);
     for (String expectMetricName : expectedMetricNames) {
-      validateMetrics(metricsNode, expectMetricName);
+      validateMetrics(mapper, metricsNode, expectMetricName, STORAGE_HOST);
     }
-
-    // for duplicate register, IllegalArgumentException shouldn't be thrown
-    String hostName = "duplicateHost";
-    ShuffleServerMetrics.addDynamicCounterForRemoteStorage(hostName);
-    ShuffleServerMetrics.addDynamicCounterForRemoteStorage(hostName);
   }
 
-  private void validateMetrics(JsonNode metricsNode, String 
expectedMetricName) {
+  private void validateMetrics(ObjectMapper mapper, JsonNode metricsNode, 
String expectedMetricName, String... labels) {
     boolean bingo = false;
     for (int i = 0; i < metricsNode.size(); i++) {
       JsonNode metricsName = metricsNode.get(i).get("name");
       if (expectedMetricName.equals(metricsName.textValue())) {
+        List<String> labelValues = 
mapper.convertValue(metricsNode.get(i).get("labelValues"), ArrayList.class);
+        assertArrayEquals(labels, labelValues.toArray(new String[0]));
         bingo = true;
         break;
       }
@@ -133,14 +134,14 @@ public class ShuffleServerMetricsTest {
 
     // test for remote storage
     ShuffleServerMetrics.incStorageRetryCounter(STORAGE_HOST);
-    assertEquals(1.0, 
ShuffleServerMetrics.counterRemoteStorageTotalWrite.get(STORAGE_HOST).get(), 
0.5);
-    assertEquals(1.0, 
ShuffleServerMetrics.counterRemoteStorageRetryWrite.get(STORAGE_HOST).get(), 
0.5);
+    assertEquals(1.0, 
ShuffleServerMetrics.counterRemoteStorageTotalWrite.labels(STORAGE_HOST).get(), 
0.5);
+    assertEquals(1.0, 
ShuffleServerMetrics.counterRemoteStorageRetryWrite.labels(STORAGE_HOST).get(), 
0.5);
     ShuffleServerMetrics.incStorageSuccessCounter(STORAGE_HOST);
-    assertEquals(2.0, 
ShuffleServerMetrics.counterRemoteStorageTotalWrite.get(STORAGE_HOST).get(), 
0.5);
-    assertEquals(1.0, 
ShuffleServerMetrics.counterRemoteStorageSuccessWrite.get(STORAGE_HOST).get(), 
0.5);
+    assertEquals(2.0, 
ShuffleServerMetrics.counterRemoteStorageTotalWrite.labels(STORAGE_HOST).get(), 
0.5);
+    assertEquals(1.0, 
ShuffleServerMetrics.counterRemoteStorageSuccessWrite.labels(STORAGE_HOST).get(),
 0.5);
     ShuffleServerMetrics.incStorageFailedCounter(STORAGE_HOST);
-    assertEquals(3.0, 
ShuffleServerMetrics.counterRemoteStorageTotalWrite.get(STORAGE_HOST).get(), 
0.5);
-    assertEquals(1.0, 
ShuffleServerMetrics.counterRemoteStorageFailedWrite.get(STORAGE_HOST).get(), 
0.5);
+    assertEquals(3.0, 
ShuffleServerMetrics.counterRemoteStorageTotalWrite.labels(STORAGE_HOST).get(), 
0.5);
+    assertEquals(1.0, 
ShuffleServerMetrics.counterRemoteStorageFailedWrite.labels(STORAGE_HOST).get(),
 0.5);
   }
 
   @Test

Reply via email to