Copilot commented on code in PR #7882: URL: https://github.com/apache/incubator-seata/pull/7882#discussion_r2660205794
########## namingserver/src/main/java/org/apache/seata/namingserver/metrics/NamingServerMetricsManager.java: ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.namingserver.metrics; + +import org.apache.seata.namingserver.entity.pojo.ClusterData; +import org.apache.seata.namingserver.listener.Watcher; + +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Supplier; + +public interface NamingServerMetricsManager { + + // Metric names + String METRIC_CLUSTER_NODE_COUNT = "seata_namingserver_cluster_node_count"; + String METRIC_WATCHER_COUNT = "seata_namingserver_watcher_count"; + String METRIC_CLUSTER_CHANGE_PUSH_TOTAL = "seata_namingserver_cluster_change_push_total"; + + // Tag names + String TAG_NAMESPACE = "namespace"; + String TAG_CLUSTER = "cluster"; + String TAG_UNIT = "unit"; + String TAG_VGROUP = "vgroup"; + + void setNamespaceClusterDataSupplier(Supplier<ConcurrentMap<String, ConcurrentMap<String, ClusterData>>> supplier); + + void setWatchersSupplier(Supplier<Map<String, Queue<Watcher<?>>>> supplier); + + void refreshClusterNodeCountMetrics(); + + void refreshWatcherCountMetrics(); + + void incrementClusterChangePushCount(String vgroup); + Review Comment: The NamingServerMetricsManager interface lacks documentation explaining the purpose of each method, the expected behavior of implementations, and when these methods should be called. Consider adding JavaDoc comments for the interface and each method to help implementers understand the contract. ```suggestion /** * Metrics manager interface for the naming server. * <p> * Implementations of this interface are responsible for: * <ul> * <li>Exposing metrics related to cluster nodes and watchers.</li> * <li>Tracking the number of cluster change push events per virtual group.</li> * <li>Pulling the latest data from the configured suppliers when metrics are refreshed.</li> * </ul> * <p> * Concrete implementations are typically instantiated once and used as a central place * to collect and publish metrics to the underlying metrics system. */ public interface NamingServerMetricsManager { /** * Metric name for the number of nodes in a cluster. * <p> * Implementations should use this name when registering or updating a metric * that reflects the current node count for each namespace/cluster. */ String METRIC_CLUSTER_NODE_COUNT = "seata_namingserver_cluster_node_count"; /** * Metric name for the number of active watchers. * <p> * Implementations should use this name when registering or updating a metric * that reflects the current watcher count for each namespace/cluster. */ String METRIC_WATCHER_COUNT = "seata_namingserver_watcher_count"; /** * Metric name for the total number of cluster change push events. * <p> * Implementations should use this name when registering or updating a counter * that tracks how many cluster change notifications have been pushed per vgroup. */ String METRIC_CLUSTER_CHANGE_PUSH_TOTAL = "seata_namingserver_cluster_change_push_total"; /** * Tag name for the namespace in which the metric is recorded. */ String TAG_NAMESPACE = "namespace"; /** * Tag name for the cluster identifier. */ String TAG_CLUSTER = "cluster"; /** * Tag name for the unit (for example, data center or logical unit). */ String TAG_UNIT = "unit"; /** * Tag name for the virtual group (vgroup) identifier. */ String TAG_VGROUP = "vgroup"; /** * Configure the supplier that provides the current cluster data grouped by namespace * and cluster name. * <p> * The supplied map is expected to be: * <ul> * <li>Key: namespace identifier.</li> * <li>Value: a map whose key is the cluster name and value is the {@link ClusterData}.</li> * </ul> * Implementations should call this supplier when refreshing cluster node count metrics * rather than holding their own copy of the data. * * @param supplier supplier of the namespace-to-cluster data map; must be thread-safe * and return the latest view of cluster data. */ void setNamespaceClusterDataSupplier(Supplier<ConcurrentMap<String, ConcurrentMap<String, ClusterData>>> supplier); /** * Configure the supplier that provides the current watchers grouped by key. * <p> * The supplied map is expected to be: * <ul> * <li>Key: a string identifier (such as namespace/cluster) associated with the watchers.</li> * <li>Value: a queue of {@link Watcher} instances registered for that key.</li> * </ul> * Implementations should call this supplier when refreshing watcher count metrics. * * @param supplier supplier of the current watchers; must be thread-safe * and return the latest view of watcher registrations. */ void setWatchersSupplier(Supplier<Map<String, Queue<Watcher<?>>>> supplier); /** * Refresh the metrics that represent the number of nodes in each cluster. * <p> * Implementations should obtain the latest cluster data from the configured * {@link #setNamespaceClusterDataSupplier(Supplier)} and update the underlying * metrics system accordingly. This method is typically called periodically or * when the cluster topology changes. */ void refreshClusterNodeCountMetrics(); /** * Refresh the metrics that represent the number of registered watchers. * <p> * Implementations should obtain the latest watcher data from the configured * {@link #setWatchersSupplier(Supplier)} and update the underlying metrics system. * This method is typically called periodically or when watcher registrations change. */ void refreshWatcherCountMetrics(); /** * Increment the total count of cluster change push events for the specified virtual group. * <p> * This method should be called each time a cluster change notification is pushed to * clients associated with the given vgroup. Implementations should record the increment * in the underlying metrics system using {@link #METRIC_CLUSTER_CHANGE_PUSH_TOTAL}. * * @param vgroup the virtual group identifier for which the push event occurred */ void incrementClusterChangePushCount(String vgroup); /** * Get the current total count of cluster change push events for the specified virtual group. * <p> * The value returned should correspond to the number of times * {@link #incrementClusterChangePushCount(String)} has been successfully recorded for * the given vgroup since the metrics manager was initialized or last reset. * * @param vgroup the virtual group identifier * @return the total number of cluster change push events recorded for the given vgroup */ ``` ########## namingserver/src/main/java/org/apache/seata/namingserver/metrics/NamingServerTagsContributor.java: ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.namingserver.metrics; + +import io.micrometer.common.KeyValue; +import io.micrometer.common.KeyValues; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.http.server.observation.DefaultServerRequestObservationConvention; +import org.springframework.http.server.observation.ServerRequestObservationContext; +import org.springframework.stereotype.Component; + +/** + * Custom tags contributor for HTTP server request metrics. + * Adds Seata business dimensions (namespace, cluster, vgroup) to + * http.server.requests metrics. + */ +@Component +@ConditionalOnProperty(name = "seata.namingserver.metrics.enabled", havingValue = "true") +public class NamingServerTagsContributor extends DefaultServerRequestObservationConvention { + + private static final String TAG_NAMESPACE = "namespace"; + private static final String TAG_CLUSTER = "cluster"; + private static final String TAG_VGROUP = "vgroup"; + private static final String UNKNOWN = "unknown"; + + @Override + public KeyValues getLowCardinalityKeyValues(ServerRequestObservationContext context) { + KeyValues keyValues = super.getLowCardinalityKeyValues(context); + + // Add namespace tag + String namespace = context.getCarrier().getParameter(TAG_NAMESPACE); + keyValues = keyValues.and(KeyValue.of(TAG_NAMESPACE, namespace != null ? namespace : UNKNOWN)); + + // Add cluster tag + String cluster = context.getCarrier().getParameter(TAG_CLUSTER); + if (cluster == null) { + cluster = context.getCarrier().getParameter("clusterName"); + } + keyValues = keyValues.and(KeyValue.of(TAG_CLUSTER, cluster != null ? cluster : UNKNOWN)); + + // Add vgroup tag + String vgroup = context.getCarrier().getParameter(TAG_VGROUP); + if (vgroup == null) { + vgroup = context.getCarrier().getParameter("group"); + } + keyValues = keyValues.and(KeyValue.of(TAG_VGROUP, vgroup != null ? vgroup : UNKNOWN)); + + return keyValues; + } Review Comment: The NamingServerTagsContributor adds business tags (namespace, cluster, vgroup) to ALL HTTP requests, which will significantly increase metric cardinality for http.server.requests. This could lead to performance and memory issues, especially in high-traffic environments. Consider adding these tags only to specific endpoints (e.g., those under /naming/v1/**) by checking the request URI before adding the tags, or using a separate metric name for naming-related requests. ########## namingserver/src/test/java/org/apache/seata/namingserver/NamingServerMetricsManagerTest.java: ########## @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.namingserver; + +import io.micrometer.core.instrument.Meter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import org.apache.seata.common.metadata.namingserver.NamingServerNode; +import org.apache.seata.common.metadata.namingserver.Unit; +import org.apache.seata.namingserver.entity.pojo.ClusterData; +import org.apache.seata.namingserver.listener.Watcher; +import org.apache.seata.namingserver.metrics.PrometheusNamingMetricsManager; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; + +import static org.apache.seata.namingserver.metrics.NamingServerMetricsManager.*; +import static org.junit.jupiter.api.Assertions.*; + +/** + * Unit tests for PrometheusNamingMetricsManager. + */ +class NamingServerMetricsManagerTest { + + private MeterRegistry meterRegistry; + private PrometheusNamingMetricsManager metricsManager; + + @BeforeEach + void setUp() { + meterRegistry = new SimpleMeterRegistry(); + metricsManager = new PrometheusNamingMetricsManager(meterRegistry); + metricsManager.init(); + } + + @Test + void testClusterNodeCountMetrics() { + // Prepare test data + ConcurrentMap<String, ConcurrentMap<String, ClusterData>> namespaceClusterDataMap = new ConcurrentHashMap<>(); + + // Create namespace -> cluster -> unit structure + String namespace = "test-namespace"; + String clusterName = "test-cluster"; + String unitName = "test-unit"; + + ClusterData clusterData = new ClusterData(clusterName, "default"); + Unit unit = new Unit(); + unit.setUnitName(unitName); + unit.setNamingInstanceList(new CopyOnWriteArrayList<>()); + + // Add some nodes + NamingServerNode node1 = new NamingServerNode(); + NamingServerNode node2 = new NamingServerNode(); + unit.getNamingInstanceList().add(node1); + unit.getNamingInstanceList().add(node2); + + clusterData.getUnitData().put(unitName, unit); + + ConcurrentMap<String, ClusterData> clusterDataMap = new ConcurrentHashMap<>(); + clusterDataMap.put(clusterName, clusterData); + namespaceClusterDataMap.put(namespace, clusterDataMap); + + // Set the supplier + metricsManager.setNamespaceClusterDataSupplier(() -> namespaceClusterDataMap); + + // Manually trigger metrics refresh + metricsManager.refreshClusterNodeCountMetrics(); + + // Verify the metric is registered + Meter meter = meterRegistry + .find(METRIC_CLUSTER_NODE_COUNT) + .tag(TAG_NAMESPACE, namespace) + .tag(TAG_CLUSTER, clusterName) + .tag(TAG_UNIT, unitName) + .meter(); + + assertNotNull(meter, "Cluster node count metric should be registered"); + } + + @Test + void testWatcherCountMetrics() { + // Prepare test data + Map<String, Queue<Watcher<?>>> watchers = new ConcurrentHashMap<>(); + String vgroup = "test-vgroup"; + + Queue<Watcher<?>> watcherQueue = new ConcurrentLinkedQueue<>(); + watcherQueue.add(new Watcher<>(vgroup, null, 5000, 1L, "127.0.0.1")); + watcherQueue.add(new Watcher<>(vgroup, null, 5000, 1L, "127.0.0.2")); + watchers.put(vgroup, watcherQueue); + + // Set the supplier + metricsManager.setWatchersSupplier(() -> watchers); + + // Manually trigger metrics refresh + metricsManager.refreshWatcherCountMetrics(); + + // Verify the metric is registered + Meter meter = + meterRegistry.find(METRIC_WATCHER_COUNT).tag(TAG_VGROUP, vgroup).meter(); + + assertNotNull(meter, "Watcher count metric should be registered"); + } + + @Test + void testClusterChangePushCounter() { + String vgroup1 = "vgroup1"; + String vgroup2 = "vgroup2"; + + // Increment counters + metricsManager.incrementClusterChangePushCount(vgroup1); + metricsManager.incrementClusterChangePushCount(vgroup1); + metricsManager.incrementClusterChangePushCount(vgroup2); + + // Verify counts + assertEquals(2.0, metricsManager.getClusterChangePushCount(vgroup1)); + assertEquals(1.0, metricsManager.getClusterChangePushCount(vgroup2)); + + // Verify metrics are registered in registry + assertNotNull(meterRegistry + .find(METRIC_CLUSTER_CHANGE_PUSH_TOTAL) + .tag(TAG_VGROUP, vgroup1) + .counter()); + assertNotNull(meterRegistry + .find(METRIC_CLUSTER_CHANGE_PUSH_TOTAL) + .tag(TAG_VGROUP, vgroup2) + .counter()); + } + + @Test + void testMultiGaugeCleanupStaleTags() { + // Prepare initial test data with two namespaces + ConcurrentMap<String, ConcurrentMap<String, ClusterData>> namespaceClusterDataMap = new ConcurrentHashMap<>(); + + String namespace1 = "namespace1"; + String namespace2 = "namespace2"; + String clusterName = "cluster1"; + String unitName = "unit1"; + + // Create two clusters in different namespaces + for (String namespace : new String[] {namespace1, namespace2}) { + ClusterData clusterData = new ClusterData(clusterName, "default"); + Unit unit = new Unit(); + unit.setUnitName(unitName); + unit.setNamingInstanceList(new CopyOnWriteArrayList<>()); + unit.getNamingInstanceList().add(new NamingServerNode()); + clusterData.getUnitData().put(unitName, unit); + + ConcurrentMap<String, ClusterData> clusterDataMap = new ConcurrentHashMap<>(); + clusterDataMap.put(clusterName, clusterData); + namespaceClusterDataMap.put(namespace, clusterDataMap); + } + + metricsManager.setNamespaceClusterDataSupplier(() -> namespaceClusterDataMap); + + // Manually trigger metrics refresh + metricsManager.refreshClusterNodeCountMetrics(); + + // Verify both metrics exist + assertNotNull(meterRegistry + .find(METRIC_CLUSTER_NODE_COUNT) + .tag(TAG_NAMESPACE, namespace1) + .meter()); + assertNotNull(meterRegistry + .find(METRIC_CLUSTER_NODE_COUNT) + .tag(TAG_NAMESPACE, namespace2) + .meter()); + + // Remove namespace2 + namespaceClusterDataMap.remove(namespace2); + + // Manually trigger metrics refresh + metricsManager.refreshClusterNodeCountMetrics(); + + // Verify namespace1 still exists but namespace2 is cleaned up + assertNotNull(meterRegistry + .find(METRIC_CLUSTER_NODE_COUNT) + .tag(TAG_NAMESPACE, namespace1) + .meter()); + // Note: MultiGauge with overwrite=true will remove stale entries + } + + @Test + void testNullSupplierHandling() { + // Don't set any suppliers + // Manually trigger refresh - should not throw exception + metricsManager.refreshClusterNodeCountMetrics(); + metricsManager.refreshWatcherCountMetrics(); + + // No exception means test passed + assertTrue(true); Review Comment: The assertion `assertTrue(true)` is redundant and provides no value. This test only verifies that no exception is thrown. Consider removing this assertion or replacing it with an assertion that actually validates the expected behavior, such as verifying that no metrics are registered in the meter registry. ```suggestion // Verify no metrics are registered when suppliers are not set assertEquals(0, meterRegistry.find(METRIC_CLUSTER_NODE_COUNT).meters().size()); assertEquals(0, meterRegistry.find(METRIC_WATCHER_COUNT).meters().size()); ``` ########## namingserver/src/main/java/org/apache/seata/namingserver/metrics/PrometheusNamingMetricsManager.java: ########## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.namingserver.metrics; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.MultiGauge; +import io.micrometer.core.instrument.Tags; +import jakarta.annotation.PostConstruct; +import org.apache.seata.common.metadata.namingserver.Unit; +import org.apache.seata.namingserver.entity.pojo.ClusterData; +import org.apache.seata.namingserver.listener.Watcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Supplier; + Review Comment: The PrometheusNamingMetricsManager class lacks class-level documentation explaining its purpose, usage, and how it integrates with the NamingServer. Consider adding a JavaDoc comment that describes the metrics it manages, when metrics are refreshed, and the configuration property that controls its activation. ```suggestion /** * Metrics manager for the NamingServer backed by Micrometer and Prometheus. * <p> * This component is responsible for exposing NamingServer runtime metrics via the injected * {@link MeterRegistry}. It manages, among others: * <ul> * <li>a {@link io.micrometer.core.instrument.MultiGauge} for the number of alive Seata Server * nodes per namespace/cluster ({@code METRIC_CLUSTER_NODE_COUNT});</li> * <li>a {@link io.micrometer.core.instrument.MultiGauge} for the number of active HTTP * long‑polling watchers ({@code METRIC_WATCHER_COUNT});</li> * <li>{@link Counter} instances tracking cluster change push events per namespace/cluster.</li> * </ul> * <p> * The gauges are initialized in {@link #init()} (invoked after construction via {@link PostConstruct}) * and refreshed in an event‑driven manner whenever the NamingServer updates its in‑memory * namespace/cluster data or the set of registered watchers through the configured supplier * callbacks ({@link #setNamespaceClusterDataSupplier(Supplier)} and {@link #setWatchersSupplier(Supplier)}). * No periodic polling is performed by this class; instead, callers are expected to trigger metric * updates when underlying state changes. * <p> * Activation of this metrics manager is controlled by the Spring Boot configuration property * {@code seata.namingserver.metrics.enabled}. The bean is only created when this property is * set to {@code true}, as declared by the {@link ConditionalOnProperty} annotation. */ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
