This is an automated email from the ASF dual-hosted git repository.

zhengchenyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new b5290b217 [#2174] feat(remote merge): Add metric for sorted api. 
(#2218)
b5290b217 is described below

commit b5290b21723b8698ea6dbaaddf3b2e4da7ee966a
Author: zhengchenyu <[email protected]>
AuthorDate: Wed Oct 23 21:02:44 2024 +0800

    [#2174] feat(remote merge): Add metric for sorted api. (#2218)
    
    ### What changes were proposed in this pull request?
    
    add metric for sorted api
    
    ### Why are the changes needed?
    
    Fix: #2174
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    test in cluster
---
 .../uniffle/server/ShuffleServerGrpcMetrics.java   | 27 ++++++++++++++++++++++
 .../uniffle/server/ShuffleServerGrpcService.java   |  5 ++--
 .../uniffle/server/ShuffleServerNettyMetrics.java  | 16 +++++++++++++
 .../server/ShuffleServerGrpcMetricsTest.java       |  4 ++--
 .../uniffle/server/ShuffleServerMetricsTest.java   |  4 ++--
 5 files changed, 50 insertions(+), 6 deletions(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcMetrics.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcMetrics.java
index 929e03d48..8636792c6 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcMetrics.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcMetrics.java
@@ -34,6 +34,8 @@ public class ShuffleServerGrpcMetrics extends GRPCMetrics {
   public static final String GET_SHUFFLE_INDEX_METHOD = "getLocalShuffleIndex";
   public static final String GET_SHUFFLE_RESULT_FOR_MULTI_PART_METHOD =
       "getShuffleResultForMultiPart";
+  public static final String GET_SORTED_SHUFFLE_DATA_METHOD = 
"getSortedShuffleData";
+  public static final String REPORT_UNIQUE_BLOCKS_METHOD = 
"reportUniqueBlocks";
 
   private static final String GRPC_REGISTERED_SHUFFLE = 
"grpc_registered_shuffle";
   private static final String GRPC_SEND_SHUFFLE_DATA = 
"grpc_send_shuffle_data";
@@ -46,6 +48,8 @@ public class ShuffleServerGrpcMetrics extends GRPCMetrics {
   private static final String GRPC_GET_SHUFFLE_DATA = 
"grpc_get_local_shuffle_data";
   private static final String GRPC_GET_MEMORY_SHUFFLE_DATA = 
"grpc_get_memory_shuffle_data";
   private static final String GRPC_GET_SHUFFLE_INDEX = 
"grpc_get_local_shuffle_index";
+  private static final String GRPC_GET_SORTED_SHUFFLE_DATA = 
"grpc_get_sorted_shuffle_data";
+  private static final String GRPC_START_SORT_MERGE = "grpc_start_sort_merge";
 
   private static final String GRPC_REGISTERED_SHUFFLE_TOTAL = 
"grpc_registered_shuffle_total";
   private static final String GRPC_SEND_SHUFFLE_DATA_TOTAL = 
"grpc_send_shuffle_data_total";
@@ -61,6 +65,9 @@ public class ShuffleServerGrpcMetrics extends GRPCMetrics {
   private static final String GRPC_GET_SHUFFLE_INDEX_TOTAL = 
"grpc_get_local_shuffle_index_total";
   private static final String GRPC_GET_SHUFFLE_RESULT_FOR_MULTI_PART_TOTAL =
       "grpc_get_shuffle_result_for_multi_part_total";
+  private static final String GRPC_GET_SORTED_SHUFFLE_DATA_TOTAL =
+      "grpc_get_sorted_shuffle_data_total";
+  private static final String GRPC_START_SORT_MERGE_TOTAL = 
"grpc_start_sort_merge_total";
 
   private static final String GRPC_SEND_SHUFFLE_DATA_TRANSPORT_LATENCY =
       "grpc_send_shuffle_data_transport_latency";
@@ -68,6 +75,8 @@ public class ShuffleServerGrpcMetrics extends GRPCMetrics {
       "grpc_get_local_shuffle_data_transport_latency";
   private static final String GRPC_GET_MEMORY_SHUFFLE_DATA_TRANSPORT_LATENCY =
       "grpc_get_memory_shuffle_data_transport_latency";
+  private static final String GRPC_GET_SORTED_SHUFFLE_DATA_TRANSPORT_LATENCY =
+      "grpc_get_sorted_shuffle_data_transport_latency";
 
   private static final String GRPC_SEND_SHUFFLE_DATA_PROCESS_LATENCY =
       "grpc_send_shuffle_data_process_latency";
@@ -79,6 +88,8 @@ public class ShuffleServerGrpcMetrics extends GRPCMetrics {
       "grpc_get_shuffle_result_for_multi_part_process_latency";
   private static final String GRPC_REPORT_SHUFFLE_RESULT_PROCESS_LATENCY =
       "grpc_report_shuffle_result_process_latency";
+  private static final String GRPC_GET_SORTED_SHUFFLE_DATA_PROCESS_LATENCY =
+      "grpc_get_sorted_shuffle_data_process_latency";
 
   public ShuffleServerGrpcMetrics(ShuffleServerConf shuffleServerConf, String 
tags) {
     super(shuffleServerConf, tags);
@@ -108,6 +119,11 @@ public class ShuffleServerGrpcMetrics extends GRPCMetrics {
         metricsManager.addLabeledGauge(GRPC_GET_MEMORY_SHUFFLE_DATA));
     gaugeMap.putIfAbsent(
         GET_SHUFFLE_INDEX_METHOD, 
metricsManager.addLabeledGauge(GRPC_GET_SHUFFLE_INDEX));
+    gaugeMap.putIfAbsent(
+        GET_SORTED_SHUFFLE_DATA_METHOD,
+        metricsManager.addLabeledGauge(GRPC_GET_SORTED_SHUFFLE_DATA));
+    gaugeMap.putIfAbsent(
+        REPORT_UNIQUE_BLOCKS_METHOD, 
metricsManager.addLabeledGauge(GRPC_START_SORT_MERGE));
 
     counterMap.putIfAbsent(
         REGISTER_SHUFFLE_METHOD, 
metricsManager.addLabeledCounter(GRPC_REGISTERED_SHUFFLE_TOTAL));
@@ -137,6 +153,11 @@ public class ShuffleServerGrpcMetrics extends GRPCMetrics {
     counterMap.putIfAbsent(
         GET_SHUFFLE_RESULT_FOR_MULTI_PART_METHOD,
         
metricsManager.addLabeledCounter(GRPC_GET_SHUFFLE_RESULT_FOR_MULTI_PART_TOTAL));
+    counterMap.putIfAbsent(
+        GET_SORTED_SHUFFLE_DATA_METHOD,
+        metricsManager.addLabeledCounter(GRPC_GET_SORTED_SHUFFLE_DATA_TOTAL));
+    counterMap.putIfAbsent(
+        REPORT_UNIQUE_BLOCKS_METHOD, 
metricsManager.addLabeledCounter(GRPC_START_SORT_MERGE_TOTAL));
 
     transportTimeSummaryMap.putIfAbsent(
         SEND_SHUFFLE_DATA_METHOD,
@@ -147,6 +168,9 @@ public class ShuffleServerGrpcMetrics extends GRPCMetrics {
     transportTimeSummaryMap.putIfAbsent(
         GET_MEMORY_SHUFFLE_DATA_METHOD,
         
metricsManager.addLabeledSummary(GRPC_GET_MEMORY_SHUFFLE_DATA_TRANSPORT_LATENCY));
+    transportTimeSummaryMap.putIfAbsent(
+        GET_SORTED_SHUFFLE_DATA_METHOD,
+        
metricsManager.addLabeledSummary(GRPC_GET_SORTED_SHUFFLE_DATA_TRANSPORT_LATENCY));
 
     processTimeSummaryMap.putIfAbsent(
         SEND_SHUFFLE_DATA_METHOD,
@@ -163,5 +187,8 @@ public class ShuffleServerGrpcMetrics extends GRPCMetrics {
     processTimeSummaryMap.putIfAbsent(
         GET_SHUFFLE_RESULT_FOR_MULTI_PART_METHOD,
         
metricsManager.addLabeledSummary(GRPC_GET_SHUFFLE_RESULT_FOR_MULTI_PART_PROCESS_LATENCY));
+    processTimeSummaryMap.putIfAbsent(
+        GET_SORTED_SHUFFLE_DATA_METHOD,
+        
metricsManager.addLabeledSummary(GRPC_GET_SORTED_SHUFFLE_DATA_PROCESS_LATENCY));
   }
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 460b64d69..695b63591 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -1501,7 +1501,8 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
         if (transportTime > 0) {
           shuffleServer
               .getGrpcMetrics()
-              
.recordTransportTime(ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD, 
transportTime);
+              .recordTransportTime(
+                  ShuffleServerGrpcMetrics.GET_SORTED_SHUFFLE_DATA_METHOD, 
transportTime);
         }
       }
       StatusCode status = StatusCode.SUCCESS;
@@ -1604,7 +1605,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
           
ShuffleServerMetrics.counterTotalReadLocalDataFileSize.inc(sdr.getDataLength());
           shuffleServer
               .getGrpcMetrics()
-              
.recordProcessTime(ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD, readTime);
+              
.recordProcessTime(ShuffleServerGrpcMetrics.GET_SORTED_SHUFFLE_DATA_METHOD, 
readTime);
           LOG.info(
               "Successfully getSortedShuffleData cost {} ms for shuffle"
                   + " data with {}, length is {}, state is {}",
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerNettyMetrics.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerNettyMetrics.java
index b8a3aed24..fff636e27 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerNettyMetrics.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerNettyMetrics.java
@@ -21,6 +21,7 @@ import org.apache.uniffle.common.metrics.NettyMetrics;
 import org.apache.uniffle.common.netty.protocol.GetLocalShuffleDataRequest;
 import org.apache.uniffle.common.netty.protocol.GetLocalShuffleIndexRequest;
 import org.apache.uniffle.common.netty.protocol.GetMemoryShuffleDataRequest;
+import org.apache.uniffle.common.netty.protocol.GetSortedShuffleDataRequest;
 import org.apache.uniffle.common.netty.protocol.SendShuffleDataRequest;
 
 public class ShuffleServerNettyMetrics extends NettyMetrics {
@@ -35,6 +36,8 @@ public class ShuffleServerNettyMetrics extends NettyMetrics {
       "netty_get_local_shuffle_index_request";
   private static final String NETTY_GET_MEMORY_SHUFFLE_DATA_REQUEST =
       "netty_get_memory_shuffle_data_request";
+  private static final String NETTY_GET_SORTED_SHUFFLE_DATA_REQUEST =
+      "netty_get_sorted_shuffle_data_request";
 
   public ShuffleServerNettyMetrics(ShuffleServerConf shuffleServerConf, String 
tags) {
     super(shuffleServerConf, tags);
@@ -54,6 +57,9 @@ public class ShuffleServerNettyMetrics extends NettyMetrics {
     gaugeMap.putIfAbsent(
         GetMemoryShuffleDataRequest.class.getName(),
         metricsManager.addLabeledGauge(NETTY_GET_MEMORY_SHUFFLE_DATA_REQUEST));
+    gaugeMap.putIfAbsent(
+        GetSortedShuffleDataRequest.class.getName(),
+        metricsManager.addLabeledGauge(NETTY_GET_SORTED_SHUFFLE_DATA_REQUEST));
 
     counterMap.putIfAbsent(
         SendShuffleDataRequest.class.getName(),
@@ -67,6 +73,9 @@ public class ShuffleServerNettyMetrics extends NettyMetrics {
     counterMap.putIfAbsent(
         GetMemoryShuffleDataRequest.class.getName(),
         metricsManager.addLabeledCounter(NETTY_GET_MEMORY_SHUFFLE_DATA_REQUEST 
+ _TOTAL));
+    counterMap.putIfAbsent(
+        GetSortedShuffleDataRequest.class.getName(),
+        metricsManager.addLabeledCounter(NETTY_GET_SORTED_SHUFFLE_DATA_REQUEST 
+ _TOTAL));
 
     transportTimeSummaryMap.putIfAbsent(
         SendShuffleDataRequest.class.getName(),
@@ -81,6 +90,10 @@ public class ShuffleServerNettyMetrics extends NettyMetrics {
         GetMemoryShuffleDataRequest.class.getName(),
         metricsManager.addLabeledSummary(
             NETTY_GET_MEMORY_SHUFFLE_DATA_REQUEST + _TRANSPORT_LATENCY));
+    transportTimeSummaryMap.putIfAbsent(
+        GetSortedShuffleDataRequest.class.getName(),
+        metricsManager.addLabeledSummary(
+            NETTY_GET_SORTED_SHUFFLE_DATA_REQUEST + _TRANSPORT_LATENCY));
 
     processTimeSummaryMap.putIfAbsent(
         SendShuffleDataRequest.class.getName(),
@@ -94,5 +107,8 @@ public class ShuffleServerNettyMetrics extends NettyMetrics {
     processTimeSummaryMap.putIfAbsent(
         GetMemoryShuffleDataRequest.class.getName(),
         metricsManager.addLabeledSummary(NETTY_GET_MEMORY_SHUFFLE_DATA_REQUEST 
+ _PROCESS_LATENCY));
+    processTimeSummaryMap.putIfAbsent(
+        GetSortedShuffleDataRequest.class.getName(),
+        metricsManager.addLabeledSummary(NETTY_GET_SORTED_SHUFFLE_DATA_REQUEST 
+ _PROCESS_LATENCY));
   }
 }
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerGrpcMetricsTest.java
 
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerGrpcMetricsTest.java
index 033494349..53a079ef7 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerGrpcMetricsTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerGrpcMetricsTest.java
@@ -42,8 +42,8 @@ public class ShuffleServerGrpcMetricsTest {
     
metrics.recordProcessTime(ShuffleServerGrpcMetrics.GET_MEMORY_SHUFFLE_DATA_METHOD,
 200);
     Map<String, Summary.Child> sendTimeSummaryTime = 
metrics.getTransportTimeSummaryMap();
     Map<String, Summary.Child> processTimeSummaryTime = 
metrics.getProcessTimeSummaryMap();
-    assertEquals(3, sendTimeSummaryTime.size());
-    assertEquals(5, processTimeSummaryTime.size());
+    assertEquals(4, sendTimeSummaryTime.size());
+    assertEquals(6, processTimeSummaryTime.size());
 
     Thread.sleep(1000L);
     assertEquals(
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
index 2e960b74b..478a7cd2d 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
@@ -231,7 +231,7 @@ public class ShuffleServerMetricsTest {
     ObjectMapper mapper = new ObjectMapper();
     JsonNode actualObj = mapper.readTree(content);
     assertEquals(2, actualObj.size());
-    assertEquals(84, actualObj.get("metrics").size());
+    assertEquals(102, actualObj.get("metrics").size());
   }
 
   @Test
@@ -240,7 +240,7 @@ public class ShuffleServerMetricsTest {
     ObjectMapper mapper = new ObjectMapper();
     JsonNode actualObj = mapper.readTree(content);
     assertEquals(2, actualObj.size());
-    assertEquals(68, actualObj.get("metrics").size());
+    assertEquals(84, actualObj.get("metrics").size());
   }
 
   @Test

Reply via email to