This is an automated email from the ASF dual-hosted git repository.

xianjin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 8dfa2a7e8 Refactor metrics system to reduce periodic reporting load 
(#1991)
8dfa2a7e8 is described below

commit 8dfa2a7e8b486febce3dbedfe16cc65932163a77
Author: kqhzz <kuangq...@gmail.com>
AuthorDate: Wed Aug 14 20:39:24 2024 +0800

    Refactor metrics system to reduce periodic reporting load (#1991)
    
    ### What changes were proposed in this pull request?
    Add another method to add gauge metric, we can use lambda to describe a 
gauge metric.
    
    ### Why are the changes needed?
    Fix: #1973
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    UTs.
---
 .../uniffle/common/metrics/MetricsManager.java     | 32 +++++++++
 .../uniffle/common/metrics/SupplierGauge.java      | 66 +++++++++++++++++
 .../uniffle/server/DefaultFlushEventHandler.java   |  9 ++-
 .../uniffle/server/NettyDirectMemoryTracker.java   | 82 ----------------------
 .../org/apache/uniffle/server/ShuffleServer.java   | 25 +++++--
 .../uniffle/server/ShuffleServerMetrics.java       | 23 +++---
 6 files changed, 129 insertions(+), 108 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java 
b/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java
index e2ae3106c..b26c055c7 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.common.metrics;
 
 import java.util.Arrays;
 import java.util.Map;
+import java.util.function.Supplier;
 
 import com.google.common.collect.Maps;
 import io.prometheus.client.CollectorRegistry;
@@ -27,12 +28,15 @@ import io.prometheus.client.Gauge;
 import io.prometheus.client.Histogram;
 import io.prometheus.client.Summary;
 
+import org.apache.uniffle.common.util.JavaUtils;
+
 public class MetricsManager {
   private final CollectorRegistry collectorRegistry;
   private final String[] defaultLabelNames;
   private final String[] defaultLabelValues;
   private static final double[] QUANTILES = {0.50, 0.75, 0.90, 0.95, 0.99};
   private static final double QUANTILE_ERROR = 0.01;
+  private Map<String, SupplierGauge> supplierGaugeMap;
 
   public MetricsManager() {
     this(null, Maps.newHashMap());
@@ -47,6 +51,7 @@ public class MetricsManager {
     this.defaultLabelNames = defaultLabels.keySet().toArray(new String[0]);
     this.defaultLabelValues =
         
Arrays.stream(defaultLabelNames).map(defaultLabels::get).toArray(String[]::new);
+    this.supplierGaugeMap = JavaUtils.newConcurrentMap();
   }
 
   public CollectorRegistry getCollectorRegistry() {
@@ -79,6 +84,19 @@ public class MetricsManager {
     return c.labels(this.defaultLabelValues);
   }
 
+  public void addLabeledGauge(String name, Supplier<Double> supplier) {
+    supplierGaugeMap.computeIfAbsent(
+        name,
+        metricName ->
+            new SupplierGauge(
+                    name,
+                    "Gauge " + name,
+                    supplier,
+                    this.defaultLabelNames,
+                    this.defaultLabelValues)
+                .register(collectorRegistry));
+  }
+
   public Histogram addHistogram(String name, double[] buckets, String... 
labels) {
     return addHistogram(name, "Histogram " + name, buckets, labels);
   }
@@ -112,4 +130,18 @@ public class MetricsManager {
     }
     return builder.register(collectorRegistry).labels(defaultLabelValues);
   }
+
+  public void unregisterAllSupplierGauge() {
+    for (SupplierGauge gauge : supplierGaugeMap.values()) {
+      collectorRegistry.unregister(gauge);
+    }
+    supplierGaugeMap.clear();
+  }
+
+  public void unregisterSupplierGauge(String name) {
+    if (supplierGaugeMap.containsKey(name)) {
+      collectorRegistry.unregister(supplierGaugeMap.get(name));
+      supplierGaugeMap.remove(name);
+    }
+  }
 }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/metrics/SupplierGauge.java 
b/common/src/main/java/org/apache/uniffle/common/metrics/SupplierGauge.java
new file mode 100644
index 000000000..674980def
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/SupplierGauge.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.metrics;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Supplier;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.GaugeMetricFamily;
+
+class SupplierGauge extends Collector implements Collector.Describable {
+  private String name;
+  private String help;
+  private Supplier<Double> supplier;
+  private List<String> labelNames;
+  private List<String> labelValues;
+
+  SupplierGauge(
+      String name,
+      String help,
+      Supplier<Double> supplier,
+      String[] labelNames,
+      String[] labelValues) {
+    this.name = name;
+    this.help = help;
+    this.supplier = supplier;
+    this.labelNames = Arrays.asList(labelNames);
+    this.labelValues = Arrays.asList(labelValues);
+  }
+
+  @Override
+  public List<MetricFamilySamples> collect() {
+    List<MetricFamilySamples.Sample> samples = new ArrayList<>();
+    samples.add(
+        new MetricFamilySamples.Sample(
+            this.name, this.labelNames, this.labelValues, 
this.supplier.get()));
+    MetricFamilySamples mfs = new MetricFamilySamples(this.name, Type.GAUGE, 
this.help, samples);
+    List<MetricFamilySamples> mfsList = new ArrayList<MetricFamilySamples>(1);
+    mfsList.add(mfs);
+    return mfsList;
+  }
+
+  @Override
+  public List<MetricFamilySamples> describe() {
+    return Collections.<MetricFamilySamples>singletonList(
+        new GaugeMetricFamily(this.name, this.help, this.labelNames));
+  }
+}
diff --git 
a/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java 
b/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
index f32a96101..e0a47526f 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
@@ -41,6 +41,8 @@ import org.apache.uniffle.storage.common.LocalStorage;
 import org.apache.uniffle.storage.common.Storage;
 import org.apache.uniffle.storage.util.StorageType;
 
+import static org.apache.uniffle.server.ShuffleServerMetrics.EVENT_QUEUE_SIZE;
+
 public class DefaultFlushEventHandler implements FlushEventHandler {
   private static final Logger LOG = 
LoggerFactory.getLogger(DefaultFlushEventHandler.class);
 
@@ -77,8 +79,6 @@ public class DefaultFlushEventHandler implements 
FlushEventHandler {
       // We need to release the memory when discarding the event
       event.doCleanup();
       ShuffleServerMetrics.counterTotalDroppedEventNum.inc();
-    } else {
-      ShuffleServerMetrics.gaugeEventQueueSize.inc();
     }
   }
 
@@ -160,8 +160,6 @@ public class DefaultFlushEventHandler implements 
FlushEventHandler {
       } else {
         ShuffleServerMetrics.gaugeFallbackFlushThreadPoolQueueSize.dec();
       }
-
-      ShuffleServerMetrics.gaugeEventQueueSize.dec();
     }
   }
 
@@ -178,6 +176,7 @@ public class DefaultFlushEventHandler implements 
FlushEventHandler {
       hadoopThreadPoolExecutor = createFlushEventExecutor(poolSize, 
"HadoopFlushEventThreadPool");
     }
     fallbackThreadPoolExecutor = createFlushEventExecutor(5, 
"FallBackFlushEventThreadPool");
+    ShuffleServerMetrics.addLabeledGauge(EVENT_QUEUE_SIZE, () -> (double) 
flushQueue.size());
     startEventProcessor();
   }
 
@@ -248,7 +247,7 @@ public class DefaultFlushEventHandler implements 
FlushEventHandler {
 
   @Override
   public int getEventNumInFlush() {
-    return (int) ShuffleServerMetrics.gaugeEventQueueSize.get();
+    return flushQueue.size();
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/uniffle/server/NettyDirectMemoryTracker.java 
b/server/src/main/java/org/apache/uniffle/server/NettyDirectMemoryTracker.java
deleted file mode 100644
index e9eb17060..000000000
--- 
a/server/src/main/java/org/apache/uniffle/server/NettyDirectMemoryTracker.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.uniffle.server;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import io.netty.util.internal.PlatformDependent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.uniffle.common.util.ThreadUtils;
-
-public class NettyDirectMemoryTracker {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(NettyDirectMemoryTracker.class);
-
-  private final long reportInitialDelay;
-  private final long reportInterval;
-  private final ScheduledExecutorService service =
-      Executors.newSingleThreadScheduledExecutor(
-          ThreadUtils.getThreadFactory("NettyDirectMemoryTracker"));
-
-  public NettyDirectMemoryTracker(ShuffleServerConf conf) {
-    this.reportInitialDelay =
-        
conf.getLong(ShuffleServerConf.SERVER_NETTY_DIRECT_MEMORY_USAGE_TRACKER_DELAY);
-    this.reportInterval =
-        
conf.getLong(ShuffleServerConf.SERVER_NETTY_DIRECT_MEMORY_USAGE_TRACKER_INTERVAL);
-  }
-
-  public void start() {
-    LOG.info(
-        "Start report direct memory usage to MetricSystem after {}ms and 
interval is {}ms",
-        reportInitialDelay,
-        reportInterval);
-
-    service.scheduleAtFixedRate(
-        () -> {
-          try {
-            long usedDirectMemoryByNetty = 
PlatformDependent.usedDirectMemory();
-            long usedDirectMemoryByGrpcNetty =
-                
io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent.usedDirectMemory();
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(
-                  "Current usedDirectMemoryByNetty:{}, 
usedDirectMemoryByGrpcNetty:{}",
-                  usedDirectMemoryByNetty,
-                  usedDirectMemoryByGrpcNetty);
-            }
-            
ShuffleServerMetrics.gaugeUsedDirectMemorySizeByNetty.set(usedDirectMemoryByNetty);
-            ShuffleServerMetrics.gaugeUsedDirectMemorySizeByGrpcNetty.set(
-                usedDirectMemoryByGrpcNetty);
-            ShuffleServerMetrics.gaugeUsedDirectMemorySize.set(
-                usedDirectMemoryByNetty + usedDirectMemoryByGrpcNetty);
-          } catch (Throwable t) {
-            LOG.error("Failed to report direct memory.", t);
-          }
-        },
-        reportInitialDelay,
-        reportInterval,
-        TimeUnit.MILLISECONDS);
-  }
-
-  public void stop() {
-    service.shutdownNow();
-  }
-}
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index 60be04b65..4b59cb5ce 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import io.netty.util.internal.PlatformDependent;
 import io.prometheus.client.CollectorRegistry;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -73,13 +74,15 @@ import static 
org.apache.uniffle.common.config.RssBaseConf.RSS_STORAGE_TYPE;
 import static 
org.apache.uniffle.common.config.RssBaseConf.RSS_TEST_MODE_ENABLE;
 import static 
org.apache.uniffle.server.ShuffleServerConf.SERVER_DECOMMISSION_CHECK_INTERVAL;
 import static 
org.apache.uniffle.server.ShuffleServerConf.SERVER_DECOMMISSION_SHUTDOWN;
+import static 
org.apache.uniffle.server.ShuffleServerMetrics.USED_DIRECT_MEMORY_SIZE;
+import static 
org.apache.uniffle.server.ShuffleServerMetrics.USED_DIRECT_MEMORY_SIZE_BY_GRPC_NETTY;
+import static 
org.apache.uniffle.server.ShuffleServerMetrics.USED_DIRECT_MEMORY_SIZE_BY_NETTY;
 
 /** Server that manages startup/shutdown of a {@code Greeter} server. */
 public class ShuffleServer {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ShuffleServer.class);
   private RegisterHeartBeat registerHeartBeat;
-  private NettyDirectMemoryTracker directMemoryUsageReporter;
   private String id;
   private String ip;
   private int grpcPort;
@@ -156,7 +159,6 @@ public class ShuffleServer {
     initMetricsReporter();
 
     registerHeartBeat.startHeartBeat();
-    directMemoryUsageReporter.start();
     Runtime.getRuntime()
         .addShutdownHook(
             new Thread() {
@@ -184,10 +186,6 @@ public class ShuffleServer {
       registerHeartBeat.shutdown();
       LOG.info("HeartBeat Stopped!");
     }
-    if (directMemoryUsageReporter != null) {
-      directMemoryUsageReporter.stop();
-      LOG.info("Direct memory usage tracker Stopped!");
-    }
     if (storageManager != null) {
       storageManager.stop();
       LOG.info("MultiStorage Stopped!");
@@ -304,7 +302,6 @@ public class ShuffleServer {
     }
 
     registerHeartBeat = new RegisterHeartBeat(this);
-    directMemoryUsageReporter = new 
NettyDirectMemoryTracker(shuffleServerConf);
     shuffleFlushManager = new ShuffleFlushManager(shuffleServerConf, this, 
storageManager);
     shuffleBufferManager =
         new ShuffleBufferManager(shuffleServerConf, shuffleFlushManager, 
nettyServerEnabled);
@@ -320,6 +317,20 @@ public class ShuffleServer {
             storageManager,
             shuffleMergeManager);
     shuffleTaskManager.start();
+    ShuffleServerMetrics.addLabeledGauge(
+        USED_DIRECT_MEMORY_SIZE_BY_NETTY, () -> (double) 
PlatformDependent.usedDirectMemory());
+    ShuffleServerMetrics.addLabeledGauge(
+        USED_DIRECT_MEMORY_SIZE_BY_GRPC_NETTY,
+        () ->
+            (double)
+                
io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent.usedDirectMemory());
+    ShuffleServerMetrics.addLabeledGauge(
+        USED_DIRECT_MEMORY_SIZE,
+        () ->
+            (double)
+                (PlatformDependent.usedDirectMemory()
+                    + 
io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent
+                        .usedDirectMemory()));
 
     setServer();
   }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index 3e886407c..b3e56c0b3 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -18,6 +18,7 @@
 package org.apache.uniffle.server;
 
 import java.util.Map;
+import java.util.function.Supplier;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
@@ -57,7 +58,7 @@ public class ShuffleServerMetrics {
   private static final String EVENT_SIZE_THRESHOLD_LEVEL2 = 
"event_size_threshold_level2";
   private static final String EVENT_SIZE_THRESHOLD_LEVEL3 = 
"event_size_threshold_level3";
   private static final String EVENT_SIZE_THRESHOLD_LEVEL4 = 
"event_size_threshold_level4";
-  private static final String EVENT_QUEUE_SIZE = "event_queue_size";
+  public static final String EVENT_QUEUE_SIZE = "event_queue_size";
   private static final String MERGE_EVENT_QUEUE_SIZE = 
"merge_event_queue_size";
   private static final String HADOOP_FLUSH_THREAD_POOL_QUEUE_SIZE =
       "hadoop_flush_thread_pool_queue_size";
@@ -98,9 +99,9 @@ public class ShuffleServerMetrics {
   private static final String IN_FLUSH_BUFFER_SIZE = "in_flush_buffer_size";
   private static final String USED_BUFFER_SIZE = "used_buffer_size";
   private static final String READ_USED_BUFFER_SIZE = "read_used_buffer_size";
-  private static final String USED_DIRECT_MEMORY_SIZE = 
"used_direct_memory_size";
-  private static final String USED_DIRECT_MEMORY_SIZE_BY_NETTY = 
"used_direct_memory_size_by_netty";
-  private static final String USED_DIRECT_MEMORY_SIZE_BY_GRPC_NETTY =
+  public static final String USED_DIRECT_MEMORY_SIZE = 
"used_direct_memory_size";
+  public static final String USED_DIRECT_MEMORY_SIZE_BY_NETTY = 
"used_direct_memory_size_by_netty";
+  public static final String USED_DIRECT_MEMORY_SIZE_BY_GRPC_NETTY =
       "used_direct_memory_size_by_grpc_netty";
   private static final String TOTAL_FAILED_WRITTEN_EVENT_NUM = 
"total_failed_written_event_num";
   private static final String TOTAL_DROPPED_EVENT_NUM = 
"total_dropped_event_num";
@@ -218,11 +219,7 @@ public class ShuffleServerMetrics {
   public static Gauge.Child gaugeInFlushBufferSize;
   public static Gauge.Child gaugeUsedBufferSize;
   public static Gauge.Child gaugeReadBufferUsedSize;
-  public static Gauge.Child gaugeUsedDirectMemorySize;
-  public static Gauge.Child gaugeUsedDirectMemorySizeByNetty;
-  public static Gauge.Child gaugeUsedDirectMemorySizeByGrpcNetty;
   public static Gauge.Child gaugeWriteHandler;
-  public static Gauge.Child gaugeEventQueueSize;
   public static Gauge.Child gaugeMergeEventQueueSize;
   public static Gauge.Child gaugeHadoopFlushThreadPoolQueueSize;
   public static Gauge.Child gaugeLocalfileFlushThreadPoolQueueSize;
@@ -449,13 +446,7 @@ public class ShuffleServerMetrics {
     gaugeInFlushBufferSize = 
metricsManager.addLabeledGauge(IN_FLUSH_BUFFER_SIZE);
     gaugeUsedBufferSize = metricsManager.addLabeledGauge(USED_BUFFER_SIZE);
     gaugeReadBufferUsedSize = 
metricsManager.addLabeledGauge(READ_USED_BUFFER_SIZE);
-    gaugeUsedDirectMemorySize = 
metricsManager.addLabeledGauge(USED_DIRECT_MEMORY_SIZE);
-    gaugeUsedDirectMemorySizeByNetty =
-        metricsManager.addLabeledGauge(USED_DIRECT_MEMORY_SIZE_BY_NETTY);
-    gaugeUsedDirectMemorySizeByGrpcNetty =
-        metricsManager.addLabeledGauge(USED_DIRECT_MEMORY_SIZE_BY_GRPC_NETTY);
     gaugeWriteHandler = metricsManager.addLabeledGauge(TOTAL_WRITE_HANDLER);
-    gaugeEventQueueSize = metricsManager.addLabeledGauge(EVENT_QUEUE_SIZE);
     gaugeMergeEventQueueSize = 
metricsManager.addLabeledGauge(MERGE_EVENT_QUEUE_SIZE);
     gaugeHadoopFlushThreadPoolQueueSize =
         metricsManager.addLabeledGauge(HADOOP_FLUSH_THREAD_POOL_QUEUE_SIZE);
@@ -521,4 +512,8 @@ public class ShuffleServerMetrics {
             .labelNames("app_id")
             .register(metricsManager.getCollectorRegistry());
   }
+
+  public static void addLabeledGauge(String name, Supplier<Double> supplier) {
+    metricsManager.addLabeledGauge(name, supplier);
+  }
 }

Reply via email to