This is an automated email from the ASF dual-hosted git repository. xianjin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push: new 8dfa2a7e8 Refactor metrics system to reduce periodic reporting load (#1991) 8dfa2a7e8 is described below commit 8dfa2a7e8b486febce3dbedfe16cc65932163a77 Author: kqhzz <kuangq...@gmail.com> AuthorDate: Wed Aug 14 20:39:24 2024 +0800 Refactor metrics system to reduce periodic reporting load (#1991) ### What changes were proposed in this pull request? Add another method to add gauge metric, we can use lambda to describe a gauge metric. ### Why are the changes needed? Fix: #1973 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? UTs. --- .../uniffle/common/metrics/MetricsManager.java | 32 +++++++++ .../uniffle/common/metrics/SupplierGauge.java | 66 +++++++++++++++++ .../uniffle/server/DefaultFlushEventHandler.java | 9 ++- .../uniffle/server/NettyDirectMemoryTracker.java | 82 ---------------------- .../org/apache/uniffle/server/ShuffleServer.java | 25 +++++-- .../uniffle/server/ShuffleServerMetrics.java | 23 +++--- 6 files changed, 129 insertions(+), 108 deletions(-) diff --git a/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java b/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java index e2ae3106c..b26c055c7 100644 --- a/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java +++ b/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java @@ -19,6 +19,7 @@ package org.apache.uniffle.common.metrics; import java.util.Arrays; import java.util.Map; +import java.util.function.Supplier; import com.google.common.collect.Maps; import io.prometheus.client.CollectorRegistry; @@ -27,12 +28,15 @@ import io.prometheus.client.Gauge; import io.prometheus.client.Histogram; import io.prometheus.client.Summary; +import org.apache.uniffle.common.util.JavaUtils; + public class MetricsManager { private final CollectorRegistry collectorRegistry; private final String[] defaultLabelNames; private final String[] defaultLabelValues; private static final double[] QUANTILES = {0.50, 0.75, 0.90, 0.95, 0.99}; private static final double QUANTILE_ERROR = 0.01; + private Map<String, SupplierGauge> supplierGaugeMap; public MetricsManager() { this(null, Maps.newHashMap()); @@ -47,6 +51,7 @@ public class MetricsManager { this.defaultLabelNames = defaultLabels.keySet().toArray(new String[0]); this.defaultLabelValues = Arrays.stream(defaultLabelNames).map(defaultLabels::get).toArray(String[]::new); + this.supplierGaugeMap = JavaUtils.newConcurrentMap(); } public CollectorRegistry getCollectorRegistry() { @@ -79,6 +84,19 @@ public class MetricsManager { return c.labels(this.defaultLabelValues); } + public void addLabeledGauge(String name, Supplier<Double> supplier) { + supplierGaugeMap.computeIfAbsent( + name, + metricName -> + new SupplierGauge( + name, + "Gauge " + name, + supplier, + this.defaultLabelNames, + this.defaultLabelValues) + .register(collectorRegistry)); + } + public Histogram addHistogram(String name, double[] buckets, String... labels) { return addHistogram(name, "Histogram " + name, buckets, labels); } @@ -112,4 +130,18 @@ public class MetricsManager { } return builder.register(collectorRegistry).labels(defaultLabelValues); } + + public void unregisterAllSupplierGauge() { + for (SupplierGauge gauge : supplierGaugeMap.values()) { + collectorRegistry.unregister(gauge); + } + supplierGaugeMap.clear(); + } + + public void unregisterSupplierGauge(String name) { + if (supplierGaugeMap.containsKey(name)) { + collectorRegistry.unregister(supplierGaugeMap.get(name)); + supplierGaugeMap.remove(name); + } + } } diff --git a/common/src/main/java/org/apache/uniffle/common/metrics/SupplierGauge.java b/common/src/main/java/org/apache/uniffle/common/metrics/SupplierGauge.java new file mode 100644 index 000000000..674980def --- /dev/null +++ b/common/src/main/java/org/apache/uniffle/common/metrics/SupplierGauge.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.metrics; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.function.Supplier; + +import io.prometheus.client.Collector; +import io.prometheus.client.GaugeMetricFamily; + +class SupplierGauge extends Collector implements Collector.Describable { + private String name; + private String help; + private Supplier<Double> supplier; + private List<String> labelNames; + private List<String> labelValues; + + SupplierGauge( + String name, + String help, + Supplier<Double> supplier, + String[] labelNames, + String[] labelValues) { + this.name = name; + this.help = help; + this.supplier = supplier; + this.labelNames = Arrays.asList(labelNames); + this.labelValues = Arrays.asList(labelValues); + } + + @Override + public List<MetricFamilySamples> collect() { + List<MetricFamilySamples.Sample> samples = new ArrayList<>(); + samples.add( + new MetricFamilySamples.Sample( + this.name, this.labelNames, this.labelValues, this.supplier.get())); + MetricFamilySamples mfs = new MetricFamilySamples(this.name, Type.GAUGE, this.help, samples); + List<MetricFamilySamples> mfsList = new ArrayList<MetricFamilySamples>(1); + mfsList.add(mfs); + return mfsList; + } + + @Override + public List<MetricFamilySamples> describe() { + return Collections.<MetricFamilySamples>singletonList( + new GaugeMetricFamily(this.name, this.help, this.labelNames)); + } +} diff --git a/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java b/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java index f32a96101..e0a47526f 100644 --- a/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java +++ b/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java @@ -41,6 +41,8 @@ import org.apache.uniffle.storage.common.LocalStorage; import org.apache.uniffle.storage.common.Storage; import org.apache.uniffle.storage.util.StorageType; +import static org.apache.uniffle.server.ShuffleServerMetrics.EVENT_QUEUE_SIZE; + public class DefaultFlushEventHandler implements FlushEventHandler { private static final Logger LOG = LoggerFactory.getLogger(DefaultFlushEventHandler.class); @@ -77,8 +79,6 @@ public class DefaultFlushEventHandler implements FlushEventHandler { // We need to release the memory when discarding the event event.doCleanup(); ShuffleServerMetrics.counterTotalDroppedEventNum.inc(); - } else { - ShuffleServerMetrics.gaugeEventQueueSize.inc(); } } @@ -160,8 +160,6 @@ public class DefaultFlushEventHandler implements FlushEventHandler { } else { ShuffleServerMetrics.gaugeFallbackFlushThreadPoolQueueSize.dec(); } - - ShuffleServerMetrics.gaugeEventQueueSize.dec(); } } @@ -178,6 +176,7 @@ public class DefaultFlushEventHandler implements FlushEventHandler { hadoopThreadPoolExecutor = createFlushEventExecutor(poolSize, "HadoopFlushEventThreadPool"); } fallbackThreadPoolExecutor = createFlushEventExecutor(5, "FallBackFlushEventThreadPool"); + ShuffleServerMetrics.addLabeledGauge(EVENT_QUEUE_SIZE, () -> (double) flushQueue.size()); startEventProcessor(); } @@ -248,7 +247,7 @@ public class DefaultFlushEventHandler implements FlushEventHandler { @Override public int getEventNumInFlush() { - return (int) ShuffleServerMetrics.gaugeEventQueueSize.get(); + return flushQueue.size(); } @Override diff --git a/server/src/main/java/org/apache/uniffle/server/NettyDirectMemoryTracker.java b/server/src/main/java/org/apache/uniffle/server/NettyDirectMemoryTracker.java deleted file mode 100644 index e9eb17060..000000000 --- a/server/src/main/java/org/apache/uniffle/server/NettyDirectMemoryTracker.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.uniffle.server; - -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import io.netty.util.internal.PlatformDependent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.uniffle.common.util.ThreadUtils; - -public class NettyDirectMemoryTracker { - - private static final Logger LOG = LoggerFactory.getLogger(NettyDirectMemoryTracker.class); - - private final long reportInitialDelay; - private final long reportInterval; - private final ScheduledExecutorService service = - Executors.newSingleThreadScheduledExecutor( - ThreadUtils.getThreadFactory("NettyDirectMemoryTracker")); - - public NettyDirectMemoryTracker(ShuffleServerConf conf) { - this.reportInitialDelay = - conf.getLong(ShuffleServerConf.SERVER_NETTY_DIRECT_MEMORY_USAGE_TRACKER_DELAY); - this.reportInterval = - conf.getLong(ShuffleServerConf.SERVER_NETTY_DIRECT_MEMORY_USAGE_TRACKER_INTERVAL); - } - - public void start() { - LOG.info( - "Start report direct memory usage to MetricSystem after {}ms and interval is {}ms", - reportInitialDelay, - reportInterval); - - service.scheduleAtFixedRate( - () -> { - try { - long usedDirectMemoryByNetty = PlatformDependent.usedDirectMemory(); - long usedDirectMemoryByGrpcNetty = - io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent.usedDirectMemory(); - if (LOG.isDebugEnabled()) { - LOG.debug( - "Current usedDirectMemoryByNetty:{}, usedDirectMemoryByGrpcNetty:{}", - usedDirectMemoryByNetty, - usedDirectMemoryByGrpcNetty); - } - ShuffleServerMetrics.gaugeUsedDirectMemorySizeByNetty.set(usedDirectMemoryByNetty); - ShuffleServerMetrics.gaugeUsedDirectMemorySizeByGrpcNetty.set( - usedDirectMemoryByGrpcNetty); - ShuffleServerMetrics.gaugeUsedDirectMemorySize.set( - usedDirectMemoryByNetty + usedDirectMemoryByGrpcNetty); - } catch (Throwable t) { - LOG.error("Failed to report direct memory.", t); - } - }, - reportInitialDelay, - reportInterval, - TimeUnit.MILLISECONDS); - } - - public void stop() { - service.shutdownNow(); - } -} diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java index 60be04b65..4b59cb5ce 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicReference; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import io.netty.util.internal.PlatformDependent; import io.prometheus.client.CollectorRegistry; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -73,13 +74,15 @@ import static org.apache.uniffle.common.config.RssBaseConf.RSS_STORAGE_TYPE; import static org.apache.uniffle.common.config.RssBaseConf.RSS_TEST_MODE_ENABLE; import static org.apache.uniffle.server.ShuffleServerConf.SERVER_DECOMMISSION_CHECK_INTERVAL; import static org.apache.uniffle.server.ShuffleServerConf.SERVER_DECOMMISSION_SHUTDOWN; +import static org.apache.uniffle.server.ShuffleServerMetrics.USED_DIRECT_MEMORY_SIZE; +import static org.apache.uniffle.server.ShuffleServerMetrics.USED_DIRECT_MEMORY_SIZE_BY_GRPC_NETTY; +import static org.apache.uniffle.server.ShuffleServerMetrics.USED_DIRECT_MEMORY_SIZE_BY_NETTY; /** Server that manages startup/shutdown of a {@code Greeter} server. */ public class ShuffleServer { private static final Logger LOG = LoggerFactory.getLogger(ShuffleServer.class); private RegisterHeartBeat registerHeartBeat; - private NettyDirectMemoryTracker directMemoryUsageReporter; private String id; private String ip; private int grpcPort; @@ -156,7 +159,6 @@ public class ShuffleServer { initMetricsReporter(); registerHeartBeat.startHeartBeat(); - directMemoryUsageReporter.start(); Runtime.getRuntime() .addShutdownHook( new Thread() { @@ -184,10 +186,6 @@ public class ShuffleServer { registerHeartBeat.shutdown(); LOG.info("HeartBeat Stopped!"); } - if (directMemoryUsageReporter != null) { - directMemoryUsageReporter.stop(); - LOG.info("Direct memory usage tracker Stopped!"); - } if (storageManager != null) { storageManager.stop(); LOG.info("MultiStorage Stopped!"); @@ -304,7 +302,6 @@ public class ShuffleServer { } registerHeartBeat = new RegisterHeartBeat(this); - directMemoryUsageReporter = new NettyDirectMemoryTracker(shuffleServerConf); shuffleFlushManager = new ShuffleFlushManager(shuffleServerConf, this, storageManager); shuffleBufferManager = new ShuffleBufferManager(shuffleServerConf, shuffleFlushManager, nettyServerEnabled); @@ -320,6 +317,20 @@ public class ShuffleServer { storageManager, shuffleMergeManager); shuffleTaskManager.start(); + ShuffleServerMetrics.addLabeledGauge( + USED_DIRECT_MEMORY_SIZE_BY_NETTY, () -> (double) PlatformDependent.usedDirectMemory()); + ShuffleServerMetrics.addLabeledGauge( + USED_DIRECT_MEMORY_SIZE_BY_GRPC_NETTY, + () -> + (double) + io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent.usedDirectMemory()); + ShuffleServerMetrics.addLabeledGauge( + USED_DIRECT_MEMORY_SIZE, + () -> + (double) + (PlatformDependent.usedDirectMemory() + + io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent + .usedDirectMemory())); setServer(); } diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java index 3e886407c..b3e56c0b3 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java @@ -18,6 +18,7 @@ package org.apache.uniffle.server; import java.util.Map; +import java.util.function.Supplier; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; @@ -57,7 +58,7 @@ public class ShuffleServerMetrics { private static final String EVENT_SIZE_THRESHOLD_LEVEL2 = "event_size_threshold_level2"; private static final String EVENT_SIZE_THRESHOLD_LEVEL3 = "event_size_threshold_level3"; private static final String EVENT_SIZE_THRESHOLD_LEVEL4 = "event_size_threshold_level4"; - private static final String EVENT_QUEUE_SIZE = "event_queue_size"; + public static final String EVENT_QUEUE_SIZE = "event_queue_size"; private static final String MERGE_EVENT_QUEUE_SIZE = "merge_event_queue_size"; private static final String HADOOP_FLUSH_THREAD_POOL_QUEUE_SIZE = "hadoop_flush_thread_pool_queue_size"; @@ -98,9 +99,9 @@ public class ShuffleServerMetrics { private static final String IN_FLUSH_BUFFER_SIZE = "in_flush_buffer_size"; private static final String USED_BUFFER_SIZE = "used_buffer_size"; private static final String READ_USED_BUFFER_SIZE = "read_used_buffer_size"; - private static final String USED_DIRECT_MEMORY_SIZE = "used_direct_memory_size"; - private static final String USED_DIRECT_MEMORY_SIZE_BY_NETTY = "used_direct_memory_size_by_netty"; - private static final String USED_DIRECT_MEMORY_SIZE_BY_GRPC_NETTY = + public static final String USED_DIRECT_MEMORY_SIZE = "used_direct_memory_size"; + public static final String USED_DIRECT_MEMORY_SIZE_BY_NETTY = "used_direct_memory_size_by_netty"; + public static final String USED_DIRECT_MEMORY_SIZE_BY_GRPC_NETTY = "used_direct_memory_size_by_grpc_netty"; private static final String TOTAL_FAILED_WRITTEN_EVENT_NUM = "total_failed_written_event_num"; private static final String TOTAL_DROPPED_EVENT_NUM = "total_dropped_event_num"; @@ -218,11 +219,7 @@ public class ShuffleServerMetrics { public static Gauge.Child gaugeInFlushBufferSize; public static Gauge.Child gaugeUsedBufferSize; public static Gauge.Child gaugeReadBufferUsedSize; - public static Gauge.Child gaugeUsedDirectMemorySize; - public static Gauge.Child gaugeUsedDirectMemorySizeByNetty; - public static Gauge.Child gaugeUsedDirectMemorySizeByGrpcNetty; public static Gauge.Child gaugeWriteHandler; - public static Gauge.Child gaugeEventQueueSize; public static Gauge.Child gaugeMergeEventQueueSize; public static Gauge.Child gaugeHadoopFlushThreadPoolQueueSize; public static Gauge.Child gaugeLocalfileFlushThreadPoolQueueSize; @@ -449,13 +446,7 @@ public class ShuffleServerMetrics { gaugeInFlushBufferSize = metricsManager.addLabeledGauge(IN_FLUSH_BUFFER_SIZE); gaugeUsedBufferSize = metricsManager.addLabeledGauge(USED_BUFFER_SIZE); gaugeReadBufferUsedSize = metricsManager.addLabeledGauge(READ_USED_BUFFER_SIZE); - gaugeUsedDirectMemorySize = metricsManager.addLabeledGauge(USED_DIRECT_MEMORY_SIZE); - gaugeUsedDirectMemorySizeByNetty = - metricsManager.addLabeledGauge(USED_DIRECT_MEMORY_SIZE_BY_NETTY); - gaugeUsedDirectMemorySizeByGrpcNetty = - metricsManager.addLabeledGauge(USED_DIRECT_MEMORY_SIZE_BY_GRPC_NETTY); gaugeWriteHandler = metricsManager.addLabeledGauge(TOTAL_WRITE_HANDLER); - gaugeEventQueueSize = metricsManager.addLabeledGauge(EVENT_QUEUE_SIZE); gaugeMergeEventQueueSize = metricsManager.addLabeledGauge(MERGE_EVENT_QUEUE_SIZE); gaugeHadoopFlushThreadPoolQueueSize = metricsManager.addLabeledGauge(HADOOP_FLUSH_THREAD_POOL_QUEUE_SIZE); @@ -521,4 +512,8 @@ public class ShuffleServerMetrics { .labelNames("app_id") .register(metricsManager.getCollectorRegistry()); } + + public static void addLabeledGauge(String name, Supplier<Double> supplier) { + metricsManager.addLabeledGauge(name, supplier); + } }