This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new b94b2b698 [client] Aggregate Fluss client connection metrics to reduce 
the number of metrics (#1896)
b94b2b698 is described below

commit b94b2b698e2ba686e6580e0e2876b31eb37b3abf
Author: Hongshun Wang <[email protected]>
AuthorDate: Wed Nov 5 15:27:42 2025 +0800

    [client] Aggregate Fluss client connection metrics to reduce the number of 
metrics (#1896)
---
 .../java/org/apache/fluss/metrics/MetricNames.java |  17 ++--
 .../fluss/rpc/metrics/ClientMetricGroup.java       |  78 +++++++++++++-
 ...tionMetricGroup.java => ConnectionMetrics.java} |  73 +++++++------
 .../fluss/rpc/netty/client/ServerConnection.java   |  22 ++--
 .../fluss/rpc/TestingTabletGatewayService.java     |   2 +-
 .../rpc/netty/client/ServerConnectionTest.java     | 113 +++++++++++++++++++--
 6 files changed, 239 insertions(+), 66 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java 
b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
index be8d644e7..4008c6b19 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
@@ -143,12 +143,17 @@ public class MetricNames {
     // 
--------------------------------------------------------------------------------------------
     // metrics for rpc client
     // 
--------------------------------------------------------------------------------------------
-    public static final String CLIENT_REQUESTS_RATE = "requestsPerSecond";
-    public static final String CLIENT_RESPONSES_RATE = "responsesPerSecond";
-    public static final String CLIENT_BYTES_IN_RATE = "bytesInPerSecond";
-    public static final String CLIENT_BYTES_OUT_RATE = "bytesOutPerSecond";
-    public static final String CLIENT_REQUEST_LATENCY_MS = "requestLatencyMs";
-    public static final String CLIENT_REQUESTS_IN_FLIGHT = "requestsInFlight";
+    public static final String CLIENT_REQUESTS_RATE_AVG = 
"requestsPerSecond_avg";
+    public static final String CLIENT_REQUESTS_RATE_TOTAL = 
"requestsPerSecond_total";
+    public static final String CLIENT_RESPONSES_RATE_AVG = 
"responsesPerSecond_avg";
+    public static final String CLIENT_RESPONSES_RATE_TOTAL = 
"responsesPerSecond_total";
+    public static final String CLIENT_BYTES_IN_RATE_AVG = 
"bytesInPerSecond_avg";
+    public static final String CLIENT_BYTES_IN_RATE_TOTAL = 
"bytesInPerSecond_total";
+    public static final String CLIENT_BYTES_OUT_RATE_AVG = 
"bytesOutPerSecond_avg";
+    public static final String CLIENT_BYTES_OUT_RATE_TOTAL = 
"bytesOutPerSecond_total";
+    public static final String CLIENT_REQUEST_LATENCY_MS_AVG = 
"requestLatencyMs_avg";
+    public static final String CLIENT_REQUEST_LATENCY_MS_MAX = 
"requestLatencyMs_max";
+    public static final String CLIENT_REQUESTS_IN_FLIGHT_TOTAL = 
"requestsInFlight_total";
 
     // 
--------------------------------------------------------------------------------------------
     // metrics for client
diff --git 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ClientMetricGroup.java 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ClientMetricGroup.java
index 163ee8fce..02abeaac7 100644
--- 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ClientMetricGroup.java
+++ 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ClientMetricGroup.java
@@ -18,13 +18,18 @@
 package org.apache.fluss.rpc.metrics;
 
 import org.apache.fluss.metrics.CharacterFilter;
+import org.apache.fluss.metrics.MetricNames;
 import org.apache.fluss.metrics.groups.AbstractMetricGroup;
 import org.apache.fluss.metrics.registry.MetricRegistry;
+import org.apache.fluss.utils.MapUtils;
 
 import java.util.Map;
+import java.util.function.ToLongFunction;
 
 /** The metric group for clients. */
 public class ClientMetricGroup extends AbstractMetricGroup {
+    private final Map<String, ConnectionMetrics> nodeToConnectionMetrics =
+            MapUtils.newConcurrentHashMap();
 
     private static final String NAME = "client";
 
@@ -33,6 +38,39 @@ public class ClientMetricGroup extends AbstractMetricGroup {
     public ClientMetricGroup(MetricRegistry registry, String clientId) {
         super(registry, new String[] {NAME}, null);
         this.clientId = clientId;
+        this.gauge(
+                MetricNames.CLIENT_REQUESTS_RATE_AVG,
+                () -> getMetricsAvg(ConnectionMetrics.Metrics::requestRate));
+        this.gauge(
+                MetricNames.CLIENT_REQUESTS_RATE_TOTAL,
+                () -> getMetricsSum(ConnectionMetrics.Metrics::requestRate));
+        this.gauge(
+                MetricNames.CLIENT_RESPONSES_RATE_AVG,
+                () -> getMetricsAvg(ConnectionMetrics.Metrics::responseRate));
+        this.gauge(
+                MetricNames.CLIENT_RESPONSES_RATE_TOTAL,
+                () -> getMetricsSum(ConnectionMetrics.Metrics::responseRate));
+        this.gauge(
+                MetricNames.CLIENT_BYTES_IN_RATE_AVG,
+                () -> getMetricsAvg(ConnectionMetrics.Metrics::byteInRate));
+        this.gauge(
+                MetricNames.CLIENT_BYTES_IN_RATE_TOTAL,
+                () -> getMetricsSum(ConnectionMetrics.Metrics::byteInRate));
+        this.gauge(
+                MetricNames.CLIENT_BYTES_OUT_RATE_AVG,
+                () -> getMetricsAvg(ConnectionMetrics.Metrics::byteOutRate));
+        this.gauge(
+                MetricNames.CLIENT_BYTES_OUT_RATE_TOTAL,
+                () -> getMetricsSum(ConnectionMetrics.Metrics::byteOutRate));
+        this.gauge(
+                MetricNames.CLIENT_REQUEST_LATENCY_MS_AVG,
+                () -> 
getMetricsAvg(ConnectionMetrics.Metrics::requestLatencyMs));
+        this.gauge(
+                MetricNames.CLIENT_REQUEST_LATENCY_MS_MAX,
+                () -> 
getMetricsMax(ConnectionMetrics.Metrics::requestLatencyMs));
+        this.gauge(
+                MetricNames.CLIENT_REQUESTS_IN_FLIGHT_TOTAL,
+                () -> 
getMetricsSum(ConnectionMetrics.Metrics::requestsInFlight));
     }
 
     @Override
@@ -49,7 +87,43 @@ public class ClientMetricGroup extends AbstractMetricGroup {
         return registry;
     }
 
-    public ConnectionMetricGroup createConnectionMetricGroup(String serverId) {
-        return new ConnectionMetricGroup(registry, serverId, this);
+    public ConnectionMetrics createConnectionMetricGroup(String serverId) {
+        // Only expose aggregate metrics to reduce the reporter pressure.
+        ConnectionMetrics connectionMetrics = new ConnectionMetrics(serverId, 
this);
+        nodeToConnectionMetrics.put(serverId, connectionMetrics);
+        return connectionMetrics;
+    }
+
+    public void removeConnectionMetricGroup(String serverId, ConnectionMetrics 
connectionMetrics) {
+        nodeToConnectionMetrics.remove(serverId, connectionMetrics);
+    }
+
+    private double getMetricsAvg(ToLongFunction<ConnectionMetrics.Metrics> 
metricGetter) {
+        return nodeToConnectionMetrics.values().stream()
+                .flatMap(
+                        connectionMetricGroup ->
+                                
connectionMetricGroup.metricsByRequestName.values().stream())
+                .mapToLong(metricGetter)
+                .average()
+                .orElse(0);
+    }
+
+    private long getMetricsSum(ToLongFunction<ConnectionMetrics.Metrics> 
metricGetter) {
+        return nodeToConnectionMetrics.values().stream()
+                .flatMap(
+                        connectionMetricGroup ->
+                                
connectionMetricGroup.metricsByRequestName.values().stream())
+                .mapToLong(metricGetter)
+                .sum();
+    }
+
+    private long getMetricsMax(ToLongFunction<ConnectionMetrics.Metrics> 
metricGetter) {
+        return nodeToConnectionMetrics.values().stream()
+                .flatMap(
+                        connectionMetricGroup ->
+                                
connectionMetricGroup.metricsByRequestName.values().stream())
+                .mapToLong(metricGetter)
+                .max()
+                .orElse(0);
     }
 }
diff --git 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ConnectionMetricGroup.java
 b/fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ConnectionMetrics.java
similarity index 61%
rename from 
fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ConnectionMetricGroup.java
rename to 
fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ConnectionMetrics.java
index 849e6a22a..6b1fd9064 100644
--- 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ConnectionMetricGroup.java
+++ 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ConnectionMetrics.java
@@ -17,14 +17,8 @@
 
 package org.apache.fluss.rpc.metrics;
 
-import org.apache.fluss.metrics.CharacterFilter;
 import org.apache.fluss.metrics.Counter;
-import org.apache.fluss.metrics.MeterView;
-import org.apache.fluss.metrics.MetricNames;
 import org.apache.fluss.metrics.ThreadSafeSimpleCounter;
-import org.apache.fluss.metrics.groups.AbstractMetricGroup;
-import org.apache.fluss.metrics.groups.MetricGroup;
-import org.apache.fluss.metrics.registry.MetricRegistry;
 import org.apache.fluss.rpc.protocol.ApiKeys;
 import org.apache.fluss.utils.MapUtils;
 
@@ -35,38 +29,24 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope;
-
 /** Metrics for ServerConnection with {@link ClientMetricGroup} as parent 
group. */
-public class ConnectionMetricGroup extends AbstractMetricGroup {
+public class ConnectionMetrics {
     private static final List<ApiKeys> REPORT_API_KEYS =
             Arrays.asList(ApiKeys.PRODUCE_LOG, ApiKeys.FETCH_LOG, 
ApiKeys.PUT_KV, ApiKeys.LOOKUP);
 
     private final String serverId;
+    private final ClientMetricGroup clientMetricGroup;
 
     /** Metrics for different request/response metrics with specify {@link 
ApiKeys}. */
-    private final Map<String, Metrics> metricsByRequestName = 
MapUtils.newConcurrentHashMap();
+    final Map<String, Metrics> metricsByRequestName = 
MapUtils.newConcurrentHashMap();
 
-    public ConnectionMetricGroup(
-            MetricRegistry registry, String serverId, ClientMetricGroup 
parent) {
-        super(registry, makeScope(parent, serverId), parent);
+    public ConnectionMetrics(String serverId, ClientMetricGroup 
clientMetricGroup) {
         this.serverId = serverId;
+        this.clientMetricGroup = clientMetricGroup;
     }
 
-    @Override
-    protected void putVariables(Map<String, String> variables) {
-        variables.put("server_id", serverId);
-    }
-
-    @Override
-    protected String getGroupName(CharacterFilter filter) {
-        return "";
-    }
-
-    @Override
-    protected String createLogicalScope(CharacterFilter filter, char 
delimiter) {
-        // ignore this metric group name in logical scope
-        return parent.getLogicalScope(filter, delimiter);
+    public void close() {
+        clientMetricGroup.removeConnectionMetricGroup(serverId, this);
     }
 
     // ------------------------------------------------------------------------
@@ -93,16 +73,15 @@ public class ConnectionMetricGroup extends 
AbstractMetricGroup {
     }
 
     @Nullable
-    private Metrics getOrCreateRequestMetrics(ApiKeys apikey) {
+    Metrics getOrCreateRequestMetrics(ApiKeys apikey) {
         if (!REPORT_API_KEYS.contains(apikey)) {
             return null;
         }
 
-        return metricsByRequestName.computeIfAbsent(
-                apikey.name(), keyName -> new Metrics(this.addGroup("request", 
keyName)));
+        return metricsByRequestName.computeIfAbsent(apikey.name(), keyName -> 
new Metrics());
     }
 
-    private static final class Metrics {
+    static final class Metrics {
         final Counter requests;
         final Counter responses;
         final Counter inComingBytes;
@@ -111,18 +90,36 @@ public class ConnectionMetricGroup extends 
AbstractMetricGroup {
         volatile long requestLatencyMs;
         final AtomicInteger requestsInFlight;
 
-        private Metrics(MetricGroup metricGroup) {
+        private Metrics() {
             requests = new ThreadSafeSimpleCounter();
-            metricGroup.meter(MetricNames.CLIENT_REQUESTS_RATE, new 
MeterView(requests));
             responses = new ThreadSafeSimpleCounter();
-            metricGroup.meter(MetricNames.CLIENT_RESPONSES_RATE, new 
MeterView(responses));
             inComingBytes = new ThreadSafeSimpleCounter();
-            metricGroup.meter(MetricNames.CLIENT_BYTES_IN_RATE, new 
MeterView(inComingBytes));
             outGoingBytes = new ThreadSafeSimpleCounter();
-            metricGroup.meter(MetricNames.CLIENT_BYTES_OUT_RATE, new 
MeterView(outGoingBytes));
-            metricGroup.gauge(MetricNames.CLIENT_REQUEST_LATENCY_MS, () -> 
requestLatencyMs);
             requestsInFlight = new AtomicInteger(0);
-            metricGroup.gauge(MetricNames.CLIENT_REQUESTS_IN_FLIGHT, 
requestsInFlight::get);
+        }
+
+        long requestRate() {
+            return requests.getCount();
+        }
+
+        long responseRate() {
+            return responses.getCount();
+        }
+
+        long byteInRate() {
+            return inComingBytes.getCount();
+        }
+
+        long byteOutRate() {
+            return outGoingBytes.getCount();
+        }
+
+        long requestLatencyMs() {
+            return requestLatencyMs;
+        }
+
+        long requestsInFlight() {
+            return requestsInFlight.get();
         }
     }
 }
diff --git 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
index 3aa85e24f..a09a50c09 100644
--- 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
+++ 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
@@ -29,7 +29,7 @@ import org.apache.fluss.rpc.messages.ApiVersionsResponse;
 import org.apache.fluss.rpc.messages.AuthenticateRequest;
 import org.apache.fluss.rpc.messages.AuthenticateResponse;
 import org.apache.fluss.rpc.metrics.ClientMetricGroup;
-import org.apache.fluss.rpc.metrics.ConnectionMetricGroup;
+import org.apache.fluss.rpc.metrics.ConnectionMetrics;
 import org.apache.fluss.rpc.protocol.ApiKeys;
 import org.apache.fluss.rpc.protocol.ApiManager;
 import org.apache.fluss.rpc.protocol.ApiMethod;
@@ -72,7 +72,7 @@ final class ServerConnection {
     // TODO: add max inflight requests limit like Kafka's 
"max.in.flight.requests.per.connection"
     private final Map<Integer, InflightRequest> inflightRequests = 
MapUtils.newConcurrentHashMap();
     private final CompletableFuture<Void> closeFuture = new 
CompletableFuture<>();
-    private final ConnectionMetricGroup connectionMetricGroup;
+    private final ConnectionMetrics connectionMetrics;
     private final ClientAuthenticator authenticator;
     private final ExponentialBackoff backoff;
 
@@ -108,7 +108,7 @@ final class ServerConnection {
             boolean isInnerClient) {
         this.node = node;
         this.state = ConnectionState.CONNECTING;
-        this.connectionMetricGroup = 
clientMetricGroup.createConnectionMetricGroup(node.uid());
+        this.connectionMetrics = 
clientMetricGroup.createConnectionMetricGroup(node.uid());
         this.authenticator = authenticator;
         this.backoff = new ExponentialBackoff(100L, 2, 5000L, 0.2);
         whenClose(closeCallback);
@@ -190,7 +190,7 @@ final class ServerConnection {
                 closeFuture.completeExceptionally(cause);
             }
 
-            connectionMetricGroup.close();
+            connectionMetrics.close();
         }
 
         closeQuietly(authenticator);
@@ -220,7 +220,7 @@ final class ServerConnection {
         public void onRequestResult(int requestId, ApiMessage response) {
             InflightRequest request = inflightRequests.remove(requestId);
             if (request != null && !request.responseFuture.isDone()) {
-                connectionMetricGroup.updateMetricsAfterGetResponse(
+                connectionMetrics.updateMetricsAfterGetResponse(
                         ApiKeys.forId(request.apiKey),
                         request.requestStartTime,
                         response.totalSize());
@@ -232,7 +232,7 @@ final class ServerConnection {
         public void onRequestFailure(int requestId, Throwable cause) {
             InflightRequest request = inflightRequests.remove(requestId);
             if (request != null && !request.responseFuture.isDone()) {
-                connectionMetricGroup.updateMetricsAfterGetResponse(
+                connectionMetrics.updateMetricsAfterGetResponse(
                         ApiKeys.forId(request.apiKey), 
request.requestStartTime, 0);
                 request.responseFuture.completeExceptionally(cause);
             }
@@ -329,14 +329,14 @@ final class ServerConnection {
                 return responseFuture;
             }
 
-            connectionMetricGroup.updateMetricsBeforeSendRequest(apiKey, 
rawRequest.totalSize());
+            connectionMetrics.updateMetricsBeforeSendRequest(apiKey, 
rawRequest.totalSize());
 
             channel.writeAndFlush(byteBuf)
                     .addListener(
                             (ChannelFutureListener)
                                     future -> {
                                         if (!future.isSuccess()) {
-                                            
connectionMetricGroup.updateMetricsAfterGetResponse(
+                                            
connectionMetrics.updateMetricsAfterGetResponse(
                                                     apiKey, 
inflight.requestStartTime, 0);
                                             Throwable cause = future.cause();
                                             if (cause instanceof IOException) {
@@ -449,8 +449,10 @@ final class ServerConnection {
     }
 
     private void switchState(ConnectionState targetState) {
-        LOG.debug("switch state form {} to {}", state, targetState);
-        state = targetState;
+        if (state != ConnectionState.DISCONNECTED) {
+            LOG.debug("switch state form {} to {}", state, targetState);
+            state = targetState;
+        }
         if (targetState == ConnectionState.READY) {
             // process pending requests
             PendingRequest pending;
diff --git 
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java 
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java
index 30ef735cd..7db365438 100644
--- 
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java
+++ 
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java
@@ -122,7 +122,7 @@ public class TestingTabletGatewayService extends 
TestingGatewayService
 
     @Override
     public CompletableFuture<LookupResponse> lookup(LookupRequest request) {
-        return null;
+        return CompletableFuture.completedFuture(new LookupResponse());
     }
 
     @Override
diff --git 
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java
 
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java
index 6b2363c88..554926ac4 100644
--- 
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java
+++ 
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java
@@ -22,11 +22,20 @@ import org.apache.fluss.cluster.ServerNode;
 import org.apache.fluss.cluster.ServerType;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.DisconnectException;
+import org.apache.fluss.metrics.Gauge;
+import org.apache.fluss.metrics.Metric;
+import org.apache.fluss.metrics.MetricType;
+import org.apache.fluss.metrics.groups.AbstractMetricGroup;
 import org.apache.fluss.metrics.groups.MetricGroup;
+import org.apache.fluss.metrics.registry.NOPMetricRegistry;
 import org.apache.fluss.metrics.util.NOPMetricsGroup;
 import org.apache.fluss.rpc.TestingGatewayService;
+import org.apache.fluss.rpc.TestingTabletGatewayService;
 import org.apache.fluss.rpc.messages.GetTableSchemaRequest;
+import org.apache.fluss.rpc.messages.LookupRequest;
+import org.apache.fluss.rpc.messages.PbLookupReqForBucket;
 import org.apache.fluss.rpc.messages.PbTablePath;
+import org.apache.fluss.rpc.metrics.ClientMetricGroup;
 import org.apache.fluss.rpc.metrics.TestingClientMetricGroup;
 import org.apache.fluss.rpc.netty.client.ServerConnection.ConnectionState;
 import org.apache.fluss.rpc.netty.server.NettyServer;
@@ -42,9 +51,23 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.util.Collections;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.fluss.metrics.MetricNames.CLIENT_BYTES_IN_RATE_AVG;
+import static org.apache.fluss.metrics.MetricNames.CLIENT_BYTES_IN_RATE_TOTAL;
+import static org.apache.fluss.metrics.MetricNames.CLIENT_BYTES_OUT_RATE_AVG;
+import static org.apache.fluss.metrics.MetricNames.CLIENT_BYTES_OUT_RATE_TOTAL;
+import static 
org.apache.fluss.metrics.MetricNames.CLIENT_REQUESTS_IN_FLIGHT_TOTAL;
+import static org.apache.fluss.metrics.MetricNames.CLIENT_REQUESTS_RATE_AVG;
+import static org.apache.fluss.metrics.MetricNames.CLIENT_REQUESTS_RATE_TOTAL;
+import static 
org.apache.fluss.metrics.MetricNames.CLIENT_REQUEST_LATENCY_MS_AVG;
+import static 
org.apache.fluss.metrics.MetricNames.CLIENT_REQUEST_LATENCY_MS_MAX;
+import static org.apache.fluss.metrics.MetricNames.CLIENT_RESPONSES_RATE_AVG;
+import static org.apache.fluss.metrics.MetricNames.CLIENT_RESPONSES_RATE_TOTAL;
 import static 
org.apache.fluss.rpc.netty.NettyUtils.getClientSocketChannelClass;
 import static org.apache.fluss.rpc.netty.NettyUtils.newEventLoopGroup;
 import static org.apache.fluss.utils.NetUtils.getAvailablePort;
@@ -60,12 +83,13 @@ public class ServerConnectionTest {
     private Configuration conf;
     private NettyServer nettyServer;
     private ServerNode serverNode;
+    private ServerNode serverNode2;
     private TestingGatewayService service;
 
     @BeforeEach
     void setUp() throws Exception {
         conf = new Configuration();
-        buildNettyServer(0);
+        buildNettyServer();
 
         eventLoopGroup = newEventLoopGroup(1, "fluss-netty-client-test");
         bootstrap =
@@ -121,22 +145,93 @@ public class ServerConnectionTest {
         assertThat(future.isDone()).isTrue();
     }
 
-    private void buildNettyServer(int serverId) throws Exception {
-        try (NetUtils.Port availablePort = getAvailablePort()) {
+    @Test
+    void testConnectionMetrics() throws ExecutionException, 
InterruptedException {
+        MockMetricRegistry metricRegistry = new MockMetricRegistry();
+        ClientMetricGroup client = new ClientMetricGroup(metricRegistry, 
"client");
+        ServerConnection connection =
+                new ServerConnection(
+                        bootstrap,
+                        serverNode,
+                        client,
+                        clientAuthenticator,
+                        (con, ignore) -> {},
+                        false);
+        ServerConnection connection2 =
+                new ServerConnection(
+                        bootstrap,
+                        serverNode2,
+                        client,
+                        clientAuthenticator,
+                        (con, ignore) -> {},
+                        false);
+        LookupRequest request = new LookupRequest().setTableId(1);
+        PbLookupReqForBucket pbLookupReqForBucket = request.addBucketsReq();
+        pbLookupReqForBucket.setBucketId(1);
+        assertThat(metricRegistry.registeredMetrics).hasSize(11);
+
+        connection.send(ApiKeys.LOOKUP, request).get();
+        connection2.send(ApiKeys.LOOKUP, request).get();
+
+        assertThat(metricRegistry.registeredMetrics).hasSize(11);
+        assertThat(metricRegistry.registeredMetrics.keySet())
+                .containsExactlyInAnyOrder(
+                        CLIENT_REQUESTS_RATE_AVG,
+                        CLIENT_REQUESTS_RATE_TOTAL,
+                        CLIENT_RESPONSES_RATE_AVG,
+                        CLIENT_RESPONSES_RATE_TOTAL,
+                        CLIENT_BYTES_IN_RATE_AVG,
+                        CLIENT_BYTES_IN_RATE_TOTAL,
+                        CLIENT_BYTES_OUT_RATE_AVG,
+                        CLIENT_BYTES_OUT_RATE_TOTAL,
+                        CLIENT_REQUEST_LATENCY_MS_AVG,
+                        CLIENT_REQUEST_LATENCY_MS_MAX,
+                        CLIENT_REQUESTS_IN_FLIGHT_TOTAL);
+        Metric metric = 
metricRegistry.registeredMetrics.get(CLIENT_REQUESTS_RATE_AVG);
+        assertThat(metric.getMetricType()).isEqualTo(MetricType.GAUGE);
+        assertThat(((Gauge<?>) metric).getValue()).isEqualTo(1.0);
+        metric = 
metricRegistry.registeredMetrics.get(CLIENT_REQUESTS_RATE_TOTAL);
+        assertThat(metric.getMetricType()).isEqualTo(MetricType.GAUGE);
+        assertThat(((Gauge<?>) metric).getValue()).isEqualTo(2L);
+        connection.close().get();
+    }
+
+    private void buildNettyServer() throws Exception {
+        try (NetUtils.Port availablePort = getAvailablePort();
+                NetUtils.Port availablePort2 = getAvailablePort()) {
             serverNode =
                     new ServerNode(
-                            serverId, "localhost", availablePort.getPort(), 
ServerType.COORDINATOR);
-            service = new TestingGatewayService();
+                            1, "localhost", availablePort.getPort(), 
ServerType.TABLET_SERVER);
+            serverNode2 =
+                    new ServerNode(
+                            2, "localhost", availablePort2.getPort(), 
ServerType.TABLET_SERVER);
+            service = new TestingTabletGatewayService();
             MetricGroup metricGroup = NOPMetricsGroup.newInstance();
             nettyServer =
                     new NettyServer(
                             conf,
-                            Collections.singleton(
-                                    new Endpoint(serverNode.host(), 
serverNode.port(), "INTERNAL")),
+                            Arrays.asList(
+                                    new Endpoint(serverNode.host(), 
serverNode.port(), "INTERNAL"),
+                                    new Endpoint(serverNode2.host(), 
serverNode2.port(), "CLIENT")),
                             service,
                             metricGroup,
                             
RequestsMetrics.createCoordinatorServerRequestMetrics(metricGroup));
             nettyServer.start();
         }
     }
+
+    private static class MockMetricRegistry extends NOPMetricRegistry {
+
+        Map<String, Metric> registeredMetrics = new HashMap<>();
+
+        @Override
+        public void register(Metric metric, String metricName, 
AbstractMetricGroup group) {
+            registeredMetrics.put(metricName, metric);
+        }
+
+        @Override
+        public void unregister(Metric metric, String metricName, 
AbstractMetricGroup group) {
+            registeredMetrics.remove(metricName, metric);
+        }
+    }
 }

Reply via email to