This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new b94b2b698 [client] Aggregate Fluss client connection metrics to reduce
the number of metrics (#1896)
b94b2b698 is described below
commit b94b2b698e2ba686e6580e0e2876b31eb37b3abf
Author: Hongshun Wang <[email protected]>
AuthorDate: Wed Nov 5 15:27:42 2025 +0800
[client] Aggregate Fluss client connection metrics to reduce the number of
metrics (#1896)
---
.../java/org/apache/fluss/metrics/MetricNames.java | 17 ++--
.../fluss/rpc/metrics/ClientMetricGroup.java | 78 +++++++++++++-
...tionMetricGroup.java => ConnectionMetrics.java} | 73 +++++++------
.../fluss/rpc/netty/client/ServerConnection.java | 22 ++--
.../fluss/rpc/TestingTabletGatewayService.java | 2 +-
.../rpc/netty/client/ServerConnectionTest.java | 113 +++++++++++++++++++--
6 files changed, 239 insertions(+), 66 deletions(-)
diff --git
a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
index be8d644e7..4008c6b19 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
@@ -143,12 +143,17 @@ public class MetricNames {
//
--------------------------------------------------------------------------------------------
// metrics for rpc client
//
--------------------------------------------------------------------------------------------
- public static final String CLIENT_REQUESTS_RATE = "requestsPerSecond";
- public static final String CLIENT_RESPONSES_RATE = "responsesPerSecond";
- public static final String CLIENT_BYTES_IN_RATE = "bytesInPerSecond";
- public static final String CLIENT_BYTES_OUT_RATE = "bytesOutPerSecond";
- public static final String CLIENT_REQUEST_LATENCY_MS = "requestLatencyMs";
- public static final String CLIENT_REQUESTS_IN_FLIGHT = "requestsInFlight";
+ public static final String CLIENT_REQUESTS_RATE_AVG =
"requestsPerSecond_avg";
+ public static final String CLIENT_REQUESTS_RATE_TOTAL =
"requestsPerSecond_total";
+ public static final String CLIENT_RESPONSES_RATE_AVG =
"responsesPerSecond_avg";
+ public static final String CLIENT_RESPONSES_RATE_TOTAL =
"responsesPerSecond_total";
+ public static final String CLIENT_BYTES_IN_RATE_AVG =
"bytesInPerSecond_avg";
+ public static final String CLIENT_BYTES_IN_RATE_TOTAL =
"bytesInPerSecond_total";
+ public static final String CLIENT_BYTES_OUT_RATE_AVG =
"bytesOutPerSecond_avg";
+ public static final String CLIENT_BYTES_OUT_RATE_TOTAL =
"bytesOutPerSecond_total";
+ public static final String CLIENT_REQUEST_LATENCY_MS_AVG =
"requestLatencyMs_avg";
+ public static final String CLIENT_REQUEST_LATENCY_MS_MAX =
"requestLatencyMs_max";
+ public static final String CLIENT_REQUESTS_IN_FLIGHT_TOTAL =
"requestsInFlight_total";
//
--------------------------------------------------------------------------------------------
// metrics for client
diff --git
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ClientMetricGroup.java
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ClientMetricGroup.java
index 163ee8fce..02abeaac7 100644
---
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ClientMetricGroup.java
+++
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ClientMetricGroup.java
@@ -18,13 +18,18 @@
package org.apache.fluss.rpc.metrics;
import org.apache.fluss.metrics.CharacterFilter;
+import org.apache.fluss.metrics.MetricNames;
import org.apache.fluss.metrics.groups.AbstractMetricGroup;
import org.apache.fluss.metrics.registry.MetricRegistry;
+import org.apache.fluss.utils.MapUtils;
import java.util.Map;
+import java.util.function.ToLongFunction;
/** The metric group for clients. */
public class ClientMetricGroup extends AbstractMetricGroup {
+ private final Map<String, ConnectionMetrics> nodeToConnectionMetrics =
+ MapUtils.newConcurrentHashMap();
private static final String NAME = "client";
@@ -33,6 +38,39 @@ public class ClientMetricGroup extends AbstractMetricGroup {
public ClientMetricGroup(MetricRegistry registry, String clientId) {
super(registry, new String[] {NAME}, null);
this.clientId = clientId;
+ this.gauge(
+ MetricNames.CLIENT_REQUESTS_RATE_AVG,
+ () -> getMetricsAvg(ConnectionMetrics.Metrics::requestRate));
+ this.gauge(
+ MetricNames.CLIENT_REQUESTS_RATE_TOTAL,
+ () -> getMetricsSum(ConnectionMetrics.Metrics::requestRate));
+ this.gauge(
+ MetricNames.CLIENT_RESPONSES_RATE_AVG,
+ () -> getMetricsAvg(ConnectionMetrics.Metrics::responseRate));
+ this.gauge(
+ MetricNames.CLIENT_RESPONSES_RATE_TOTAL,
+ () -> getMetricsSum(ConnectionMetrics.Metrics::responseRate));
+ this.gauge(
+ MetricNames.CLIENT_BYTES_IN_RATE_AVG,
+ () -> getMetricsAvg(ConnectionMetrics.Metrics::byteInRate));
+ this.gauge(
+ MetricNames.CLIENT_BYTES_IN_RATE_TOTAL,
+ () -> getMetricsSum(ConnectionMetrics.Metrics::byteInRate));
+ this.gauge(
+ MetricNames.CLIENT_BYTES_OUT_RATE_AVG,
+ () -> getMetricsAvg(ConnectionMetrics.Metrics::byteOutRate));
+ this.gauge(
+ MetricNames.CLIENT_BYTES_OUT_RATE_TOTAL,
+ () -> getMetricsSum(ConnectionMetrics.Metrics::byteOutRate));
+ this.gauge(
+ MetricNames.CLIENT_REQUEST_LATENCY_MS_AVG,
+ () ->
getMetricsAvg(ConnectionMetrics.Metrics::requestLatencyMs));
+ this.gauge(
+ MetricNames.CLIENT_REQUEST_LATENCY_MS_MAX,
+ () ->
getMetricsMax(ConnectionMetrics.Metrics::requestLatencyMs));
+ this.gauge(
+ MetricNames.CLIENT_REQUESTS_IN_FLIGHT_TOTAL,
+ () ->
getMetricsSum(ConnectionMetrics.Metrics::requestsInFlight));
}
@Override
@@ -49,7 +87,43 @@ public class ClientMetricGroup extends AbstractMetricGroup {
return registry;
}
- public ConnectionMetricGroup createConnectionMetricGroup(String serverId) {
- return new ConnectionMetricGroup(registry, serverId, this);
+ public ConnectionMetrics createConnectionMetricGroup(String serverId) {
+ // Only expose aggregate metrics to reduce the reporter pressure.
+ ConnectionMetrics connectionMetrics = new ConnectionMetrics(serverId,
this);
+ nodeToConnectionMetrics.put(serverId, connectionMetrics);
+ return connectionMetrics;
+ }
+
+ public void removeConnectionMetricGroup(String serverId, ConnectionMetrics
connectionMetrics) {
+ nodeToConnectionMetrics.remove(serverId, connectionMetrics);
+ }
+
+ private double getMetricsAvg(ToLongFunction<ConnectionMetrics.Metrics>
metricGetter) {
+ return nodeToConnectionMetrics.values().stream()
+ .flatMap(
+ connectionMetricGroup ->
+
connectionMetricGroup.metricsByRequestName.values().stream())
+ .mapToLong(metricGetter)
+ .average()
+ .orElse(0);
+ }
+
+ private long getMetricsSum(ToLongFunction<ConnectionMetrics.Metrics>
metricGetter) {
+ return nodeToConnectionMetrics.values().stream()
+ .flatMap(
+ connectionMetricGroup ->
+
connectionMetricGroup.metricsByRequestName.values().stream())
+ .mapToLong(metricGetter)
+ .sum();
+ }
+
+ private long getMetricsMax(ToLongFunction<ConnectionMetrics.Metrics>
metricGetter) {
+ return nodeToConnectionMetrics.values().stream()
+ .flatMap(
+ connectionMetricGroup ->
+
connectionMetricGroup.metricsByRequestName.values().stream())
+ .mapToLong(metricGetter)
+ .max()
+ .orElse(0);
}
}
diff --git
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ConnectionMetricGroup.java
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ConnectionMetrics.java
similarity index 61%
rename from
fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ConnectionMetricGroup.java
rename to
fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ConnectionMetrics.java
index 849e6a22a..6b1fd9064 100644
---
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ConnectionMetricGroup.java
+++
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/metrics/ConnectionMetrics.java
@@ -17,14 +17,8 @@
package org.apache.fluss.rpc.metrics;
-import org.apache.fluss.metrics.CharacterFilter;
import org.apache.fluss.metrics.Counter;
-import org.apache.fluss.metrics.MeterView;
-import org.apache.fluss.metrics.MetricNames;
import org.apache.fluss.metrics.ThreadSafeSimpleCounter;
-import org.apache.fluss.metrics.groups.AbstractMetricGroup;
-import org.apache.fluss.metrics.groups.MetricGroup;
-import org.apache.fluss.metrics.registry.MetricRegistry;
import org.apache.fluss.rpc.protocol.ApiKeys;
import org.apache.fluss.utils.MapUtils;
@@ -35,38 +29,24 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
-import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope;
-
/** Metrics for ServerConnection with {@link ClientMetricGroup} as parent
group. */
-public class ConnectionMetricGroup extends AbstractMetricGroup {
+public class ConnectionMetrics {
private static final List<ApiKeys> REPORT_API_KEYS =
Arrays.asList(ApiKeys.PRODUCE_LOG, ApiKeys.FETCH_LOG,
ApiKeys.PUT_KV, ApiKeys.LOOKUP);
private final String serverId;
+ private final ClientMetricGroup clientMetricGroup;
/** Metrics for different request/response metrics with specify {@link
ApiKeys}. */
- private final Map<String, Metrics> metricsByRequestName =
MapUtils.newConcurrentHashMap();
+ final Map<String, Metrics> metricsByRequestName =
MapUtils.newConcurrentHashMap();
- public ConnectionMetricGroup(
- MetricRegistry registry, String serverId, ClientMetricGroup
parent) {
- super(registry, makeScope(parent, serverId), parent);
+ public ConnectionMetrics(String serverId, ClientMetricGroup
clientMetricGroup) {
this.serverId = serverId;
+ this.clientMetricGroup = clientMetricGroup;
}
- @Override
- protected void putVariables(Map<String, String> variables) {
- variables.put("server_id", serverId);
- }
-
- @Override
- protected String getGroupName(CharacterFilter filter) {
- return "";
- }
-
- @Override
- protected String createLogicalScope(CharacterFilter filter, char
delimiter) {
- // ignore this metric group name in logical scope
- return parent.getLogicalScope(filter, delimiter);
+ public void close() {
+ clientMetricGroup.removeConnectionMetricGroup(serverId, this);
}
// ------------------------------------------------------------------------
@@ -93,16 +73,15 @@ public class ConnectionMetricGroup extends
AbstractMetricGroup {
}
@Nullable
- private Metrics getOrCreateRequestMetrics(ApiKeys apikey) {
+ Metrics getOrCreateRequestMetrics(ApiKeys apikey) {
if (!REPORT_API_KEYS.contains(apikey)) {
return null;
}
- return metricsByRequestName.computeIfAbsent(
- apikey.name(), keyName -> new Metrics(this.addGroup("request",
keyName)));
+ return metricsByRequestName.computeIfAbsent(apikey.name(), keyName ->
new Metrics());
}
- private static final class Metrics {
+ static final class Metrics {
final Counter requests;
final Counter responses;
final Counter inComingBytes;
@@ -111,18 +90,36 @@ public class ConnectionMetricGroup extends
AbstractMetricGroup {
volatile long requestLatencyMs;
final AtomicInteger requestsInFlight;
- private Metrics(MetricGroup metricGroup) {
+ private Metrics() {
requests = new ThreadSafeSimpleCounter();
- metricGroup.meter(MetricNames.CLIENT_REQUESTS_RATE, new
MeterView(requests));
responses = new ThreadSafeSimpleCounter();
- metricGroup.meter(MetricNames.CLIENT_RESPONSES_RATE, new
MeterView(responses));
inComingBytes = new ThreadSafeSimpleCounter();
- metricGroup.meter(MetricNames.CLIENT_BYTES_IN_RATE, new
MeterView(inComingBytes));
outGoingBytes = new ThreadSafeSimpleCounter();
- metricGroup.meter(MetricNames.CLIENT_BYTES_OUT_RATE, new
MeterView(outGoingBytes));
- metricGroup.gauge(MetricNames.CLIENT_REQUEST_LATENCY_MS, () ->
requestLatencyMs);
requestsInFlight = new AtomicInteger(0);
- metricGroup.gauge(MetricNames.CLIENT_REQUESTS_IN_FLIGHT,
requestsInFlight::get);
+ }
+
+ long requestRate() {
+ return requests.getCount();
+ }
+
+ long responseRate() {
+ return responses.getCount();
+ }
+
+ long byteInRate() {
+ return inComingBytes.getCount();
+ }
+
+ long byteOutRate() {
+ return outGoingBytes.getCount();
+ }
+
+ long requestLatencyMs() {
+ return requestLatencyMs;
+ }
+
+ long requestsInFlight() {
+ return requestsInFlight.get();
}
}
}
diff --git
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
index 3aa85e24f..a09a50c09 100644
---
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
+++
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
@@ -29,7 +29,7 @@ import org.apache.fluss.rpc.messages.ApiVersionsResponse;
import org.apache.fluss.rpc.messages.AuthenticateRequest;
import org.apache.fluss.rpc.messages.AuthenticateResponse;
import org.apache.fluss.rpc.metrics.ClientMetricGroup;
-import org.apache.fluss.rpc.metrics.ConnectionMetricGroup;
+import org.apache.fluss.rpc.metrics.ConnectionMetrics;
import org.apache.fluss.rpc.protocol.ApiKeys;
import org.apache.fluss.rpc.protocol.ApiManager;
import org.apache.fluss.rpc.protocol.ApiMethod;
@@ -72,7 +72,7 @@ final class ServerConnection {
// TODO: add max inflight requests limit like Kafka's
"max.in.flight.requests.per.connection"
private final Map<Integer, InflightRequest> inflightRequests =
MapUtils.newConcurrentHashMap();
private final CompletableFuture<Void> closeFuture = new
CompletableFuture<>();
- private final ConnectionMetricGroup connectionMetricGroup;
+ private final ConnectionMetrics connectionMetrics;
private final ClientAuthenticator authenticator;
private final ExponentialBackoff backoff;
@@ -108,7 +108,7 @@ final class ServerConnection {
boolean isInnerClient) {
this.node = node;
this.state = ConnectionState.CONNECTING;
- this.connectionMetricGroup =
clientMetricGroup.createConnectionMetricGroup(node.uid());
+ this.connectionMetrics =
clientMetricGroup.createConnectionMetricGroup(node.uid());
this.authenticator = authenticator;
this.backoff = new ExponentialBackoff(100L, 2, 5000L, 0.2);
whenClose(closeCallback);
@@ -190,7 +190,7 @@ final class ServerConnection {
closeFuture.completeExceptionally(cause);
}
- connectionMetricGroup.close();
+ connectionMetrics.close();
}
closeQuietly(authenticator);
@@ -220,7 +220,7 @@ final class ServerConnection {
public void onRequestResult(int requestId, ApiMessage response) {
InflightRequest request = inflightRequests.remove(requestId);
if (request != null && !request.responseFuture.isDone()) {
- connectionMetricGroup.updateMetricsAfterGetResponse(
+ connectionMetrics.updateMetricsAfterGetResponse(
ApiKeys.forId(request.apiKey),
request.requestStartTime,
response.totalSize());
@@ -232,7 +232,7 @@ final class ServerConnection {
public void onRequestFailure(int requestId, Throwable cause) {
InflightRequest request = inflightRequests.remove(requestId);
if (request != null && !request.responseFuture.isDone()) {
- connectionMetricGroup.updateMetricsAfterGetResponse(
+ connectionMetrics.updateMetricsAfterGetResponse(
ApiKeys.forId(request.apiKey),
request.requestStartTime, 0);
request.responseFuture.completeExceptionally(cause);
}
@@ -329,14 +329,14 @@ final class ServerConnection {
return responseFuture;
}
- connectionMetricGroup.updateMetricsBeforeSendRequest(apiKey,
rawRequest.totalSize());
+ connectionMetrics.updateMetricsBeforeSendRequest(apiKey,
rawRequest.totalSize());
channel.writeAndFlush(byteBuf)
.addListener(
(ChannelFutureListener)
future -> {
if (!future.isSuccess()) {
-
connectionMetricGroup.updateMetricsAfterGetResponse(
+
connectionMetrics.updateMetricsAfterGetResponse(
apiKey,
inflight.requestStartTime, 0);
Throwable cause = future.cause();
if (cause instanceof IOException) {
@@ -449,8 +449,10 @@ final class ServerConnection {
}
private void switchState(ConnectionState targetState) {
- LOG.debug("switch state form {} to {}", state, targetState);
- state = targetState;
+ if (state != ConnectionState.DISCONNECTED) {
+ LOG.debug("switch state form {} to {}", state, targetState);
+ state = targetState;
+ }
if (targetState == ConnectionState.READY) {
// process pending requests
PendingRequest pending;
diff --git
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java
index 30ef735cd..7db365438 100644
---
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java
+++
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java
@@ -122,7 +122,7 @@ public class TestingTabletGatewayService extends
TestingGatewayService
@Override
public CompletableFuture<LookupResponse> lookup(LookupRequest request) {
- return null;
+ return CompletableFuture.completedFuture(new LookupResponse());
}
@Override
diff --git
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java
index 6b2363c88..554926ac4 100644
---
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java
+++
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java
@@ -22,11 +22,20 @@ import org.apache.fluss.cluster.ServerNode;
import org.apache.fluss.cluster.ServerType;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.DisconnectException;
+import org.apache.fluss.metrics.Gauge;
+import org.apache.fluss.metrics.Metric;
+import org.apache.fluss.metrics.MetricType;
+import org.apache.fluss.metrics.groups.AbstractMetricGroup;
import org.apache.fluss.metrics.groups.MetricGroup;
+import org.apache.fluss.metrics.registry.NOPMetricRegistry;
import org.apache.fluss.metrics.util.NOPMetricsGroup;
import org.apache.fluss.rpc.TestingGatewayService;
+import org.apache.fluss.rpc.TestingTabletGatewayService;
import org.apache.fluss.rpc.messages.GetTableSchemaRequest;
+import org.apache.fluss.rpc.messages.LookupRequest;
+import org.apache.fluss.rpc.messages.PbLookupReqForBucket;
import org.apache.fluss.rpc.messages.PbTablePath;
+import org.apache.fluss.rpc.metrics.ClientMetricGroup;
import org.apache.fluss.rpc.metrics.TestingClientMetricGroup;
import org.apache.fluss.rpc.netty.client.ServerConnection.ConnectionState;
import org.apache.fluss.rpc.netty.server.NettyServer;
@@ -42,9 +51,23 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.util.Collections;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
-
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.fluss.metrics.MetricNames.CLIENT_BYTES_IN_RATE_AVG;
+import static org.apache.fluss.metrics.MetricNames.CLIENT_BYTES_IN_RATE_TOTAL;
+import static org.apache.fluss.metrics.MetricNames.CLIENT_BYTES_OUT_RATE_AVG;
+import static org.apache.fluss.metrics.MetricNames.CLIENT_BYTES_OUT_RATE_TOTAL;
+import static
org.apache.fluss.metrics.MetricNames.CLIENT_REQUESTS_IN_FLIGHT_TOTAL;
+import static org.apache.fluss.metrics.MetricNames.CLIENT_REQUESTS_RATE_AVG;
+import static org.apache.fluss.metrics.MetricNames.CLIENT_REQUESTS_RATE_TOTAL;
+import static
org.apache.fluss.metrics.MetricNames.CLIENT_REQUEST_LATENCY_MS_AVG;
+import static
org.apache.fluss.metrics.MetricNames.CLIENT_REQUEST_LATENCY_MS_MAX;
+import static org.apache.fluss.metrics.MetricNames.CLIENT_RESPONSES_RATE_AVG;
+import static org.apache.fluss.metrics.MetricNames.CLIENT_RESPONSES_RATE_TOTAL;
import static
org.apache.fluss.rpc.netty.NettyUtils.getClientSocketChannelClass;
import static org.apache.fluss.rpc.netty.NettyUtils.newEventLoopGroup;
import static org.apache.fluss.utils.NetUtils.getAvailablePort;
@@ -60,12 +83,13 @@ public class ServerConnectionTest {
private Configuration conf;
private NettyServer nettyServer;
private ServerNode serverNode;
+ private ServerNode serverNode2;
private TestingGatewayService service;
@BeforeEach
void setUp() throws Exception {
conf = new Configuration();
- buildNettyServer(0);
+ buildNettyServer();
eventLoopGroup = newEventLoopGroup(1, "fluss-netty-client-test");
bootstrap =
@@ -121,22 +145,93 @@ public class ServerConnectionTest {
assertThat(future.isDone()).isTrue();
}
- private void buildNettyServer(int serverId) throws Exception {
- try (NetUtils.Port availablePort = getAvailablePort()) {
+ @Test
+ void testConnectionMetrics() throws ExecutionException,
InterruptedException {
+ MockMetricRegistry metricRegistry = new MockMetricRegistry();
+ ClientMetricGroup client = new ClientMetricGroup(metricRegistry,
"client");
+ ServerConnection connection =
+ new ServerConnection(
+ bootstrap,
+ serverNode,
+ client,
+ clientAuthenticator,
+ (con, ignore) -> {},
+ false);
+ ServerConnection connection2 =
+ new ServerConnection(
+ bootstrap,
+ serverNode2,
+ client,
+ clientAuthenticator,
+ (con, ignore) -> {},
+ false);
+ LookupRequest request = new LookupRequest().setTableId(1);
+ PbLookupReqForBucket pbLookupReqForBucket = request.addBucketsReq();
+ pbLookupReqForBucket.setBucketId(1);
+ assertThat(metricRegistry.registeredMetrics).hasSize(11);
+
+ connection.send(ApiKeys.LOOKUP, request).get();
+ connection2.send(ApiKeys.LOOKUP, request).get();
+
+ assertThat(metricRegistry.registeredMetrics).hasSize(11);
+ assertThat(metricRegistry.registeredMetrics.keySet())
+ .containsExactlyInAnyOrder(
+ CLIENT_REQUESTS_RATE_AVG,
+ CLIENT_REQUESTS_RATE_TOTAL,
+ CLIENT_RESPONSES_RATE_AVG,
+ CLIENT_RESPONSES_RATE_TOTAL,
+ CLIENT_BYTES_IN_RATE_AVG,
+ CLIENT_BYTES_IN_RATE_TOTAL,
+ CLIENT_BYTES_OUT_RATE_AVG,
+ CLIENT_BYTES_OUT_RATE_TOTAL,
+ CLIENT_REQUEST_LATENCY_MS_AVG,
+ CLIENT_REQUEST_LATENCY_MS_MAX,
+ CLIENT_REQUESTS_IN_FLIGHT_TOTAL);
+ Metric metric =
metricRegistry.registeredMetrics.get(CLIENT_REQUESTS_RATE_AVG);
+ assertThat(metric.getMetricType()).isEqualTo(MetricType.GAUGE);
+ assertThat(((Gauge<?>) metric).getValue()).isEqualTo(1.0);
+ metric =
metricRegistry.registeredMetrics.get(CLIENT_REQUESTS_RATE_TOTAL);
+ assertThat(metric.getMetricType()).isEqualTo(MetricType.GAUGE);
+ assertThat(((Gauge<?>) metric).getValue()).isEqualTo(2L);
+ connection.close().get();
+ }
+
+ private void buildNettyServer() throws Exception {
+ try (NetUtils.Port availablePort = getAvailablePort();
+ NetUtils.Port availablePort2 = getAvailablePort()) {
serverNode =
new ServerNode(
- serverId, "localhost", availablePort.getPort(),
ServerType.COORDINATOR);
- service = new TestingGatewayService();
+ 1, "localhost", availablePort.getPort(),
ServerType.TABLET_SERVER);
+ serverNode2 =
+ new ServerNode(
+ 2, "localhost", availablePort2.getPort(),
ServerType.TABLET_SERVER);
+ service = new TestingTabletGatewayService();
MetricGroup metricGroup = NOPMetricsGroup.newInstance();
nettyServer =
new NettyServer(
conf,
- Collections.singleton(
- new Endpoint(serverNode.host(),
serverNode.port(), "INTERNAL")),
+ Arrays.asList(
+ new Endpoint(serverNode.host(),
serverNode.port(), "INTERNAL"),
+ new Endpoint(serverNode2.host(),
serverNode2.port(), "CLIENT")),
service,
metricGroup,
RequestsMetrics.createCoordinatorServerRequestMetrics(metricGroup));
nettyServer.start();
}
}
+
+ private static class MockMetricRegistry extends NOPMetricRegistry {
+
+ Map<String, Metric> registeredMetrics = new HashMap<>();
+
+ @Override
+ public void register(Metric metric, String metricName,
AbstractMetricGroup group) {
+ registeredMetrics.put(metricName, metric);
+ }
+
+ @Override
+ public void unregister(Metric metric, String metricName,
AbstractMetricGroup group) {
+ registeredMetrics.remove(metricName, metric);
+ }
+ }
}