This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 699792d RATIS-648. Add metrics related to GrpcLogAppendRequests.
Contributed by Siddharth Wagle.
699792d is described below
commit 699792d56ca3828351b9e72cf983562eb8c11b21
Author: Shashikant Banerjee <[email protected]>
AuthorDate: Thu Oct 10 14:55:11 2019 +0530
RATIS-648. Add metrics related to GrpcLogAppendRequests. Contributed by
Siddharth Wagle.
---
ratis-grpc/pom.xml | 13 ++-
.../ratis/grpc/metrics/GrpcServerMetrics.java | 92 ++++++++++++++++++
.../apache/ratis/grpc/server/GrpcLogAppender.java | 74 +++++++++++----
.../ratis/grpc/server/TestGrpcServerMetrics.java | 105 +++++++++++++++++++++
.../apache/ratis/server/impl/RaftServerImpl.java | 9 ++
.../ratis/server/metrics/RatisMetricNames.java | 1 +
.../apache/ratis/server/metrics/RatisMetrics.java | 6 ++
.../server/metrics/TestLeaderElectionMetrics.java | 2 -
8 files changed, 279 insertions(+), 23 deletions(-)
diff --git a/ratis-grpc/pom.xml b/ratis-grpc/pom.xml
index 854138f..5b6ca6d 100644
--- a/ratis-grpc/pom.xml
+++ b/ratis-grpc/pom.xml
@@ -43,7 +43,6 @@
<scope>test</scope>
<type>test-jar</type>
</dependency>
-
<dependency>
<artifactId>ratis-client</artifactId>
<groupId>org.apache.ratis</groupId>
@@ -54,7 +53,6 @@
<scope>test</scope>
<type>test-jar</type>
</dependency>
-
<dependency>
<artifactId>ratis-server</artifactId>
<groupId>org.apache.ratis</groupId>
@@ -66,10 +64,19 @@
<scope>test</scope>
<type>test-jar</type>
</dependency>
-
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/GrpcServerMetrics.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/GrpcServerMetrics.java
new file mode 100644
index 0000000..75da000
--- /dev/null
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/GrpcServerMetrics.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.metrics;
+
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.ratis.metrics.JVMMetrics;
+import org.apache.ratis.metrics.MetricRegistries;
+import org.apache.ratis.metrics.MetricRegistryInfo;
+import org.apache.ratis.metrics.MetricsReporting;
+import org.apache.ratis.metrics.RatisMetricRegistry;
+
+import com.codahale.metrics.Timer;
+import com.google.common.annotations.VisibleForTesting;
+
+public class GrpcServerMetrics {
+ private final RatisMetricRegistry registry;
+ private static MetricsReporting metricsReporting = new MetricsReporting(500,
TimeUnit.MILLISECONDS);
+
+ private static final String RATIS_GRPC_METRICS_APP_NAME = "ratis_grpc";
+ private static final String RATIS_GRPC_METRICS_COMP_NAME = "log_appender";
+ private static final String RATIS_GRPC_METRICS_DESC = "Metrics for Ratis
Grpc Log Appender";
+
+ public static final String RATIS_GRPC_METRICS_LOG_APPENDER_LATENCY =
+ "grpc_log_appender_follower_%s_latency";
+ public static final String RATIS_GRPC_METRICS_LOG_APPENDER_SUCCESS =
+ "grpc_log_appender_follower_%s_success_reply_count";
+ public static final String RATIS_GRPC_METRICS_LOG_APPENDER_NOT_LEADER =
+ "grpc_log_appender_follower_%s_not_leader_reply_count";
+ public static final String RATIS_GRPC_METRICS_LOG_APPENDER_INCONSISTENCY =
+ "grpc_log_appender_follower_%s_inconsistency_reply_count";
+ public static final String RATIS_GRPC_METRICS_REQUEST_RETRY_COUNT =
"grpc_log_appender_num_retries";
+ public static final String RATIS_GRPC_METRICS_REQUESTS_TOTAL =
"grpc_log_appender_num_requests";
+
+ public GrpcServerMetrics(String serverId) {
+ MetricRegistryInfo info = new MetricRegistryInfo(serverId,
RATIS_GRPC_METRICS_APP_NAME,
+ RATIS_GRPC_METRICS_COMP_NAME, RATIS_GRPC_METRICS_DESC);
+ Optional<RatisMetricRegistry> metricRegistry =
MetricRegistries.global().get(info);
+
+ registry = metricRegistry.orElseGet(() ->
MetricRegistries.global().create(info));
+
+ metricsReporting.startMetricsReporter(registry,
MetricsReporting.MetricReporterType.JMX,
+ MetricsReporting.MetricReporterType.HADOOP2);
+ // JVM metrics
+ JVMMetrics.startJVMReporting(1000, TimeUnit.MILLISECONDS,
MetricsReporting.MetricReporterType.JMX);
+ }
+
+ public Timer getGrpcLogAppenderLatencyTimer(String follower) {
+ return
registry.timer(String.format(RATIS_GRPC_METRICS_LOG_APPENDER_LATENCY,
follower));
+ }
+
+ public void onRequestRetry() {
+ registry.counter(RATIS_GRPC_METRICS_REQUEST_RETRY_COUNT).inc();
+ }
+
+ public void onRequestCreate() {
+ registry.counter(RATIS_GRPC_METRICS_REQUESTS_TOTAL).inc();
+ }
+
+ public void onRequestSuccess(String follower) {
+ registry.counter(String.format(RATIS_GRPC_METRICS_LOG_APPENDER_SUCCESS,
follower)).inc();
+ }
+
+ public void onRequestNotLeader(String follower) {
+
registry.counter(String.format(RATIS_GRPC_METRICS_LOG_APPENDER_NOT_LEADER,
follower)).inc();
+ }
+
+ public void onRequestInconsistency(String follower) {
+
registry.counter(String.format(RATIS_GRPC_METRICS_LOG_APPENDER_INCONSISTENCY,
follower)).inc();
+ }
+
+ @VisibleForTesting
+ public RatisMetricRegistry getRegistry() {
+ return registry;
+ }
+}
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 549426b..6374c31 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -19,6 +19,7 @@ package org.apache.ratis.grpc.server;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.GrpcUtil;
+import org.apache.ratis.grpc.metrics.GrpcServerMetrics;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.FollowerInfo;
import org.apache.ratis.server.impl.LeaderState;
@@ -41,6 +42,8 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import com.codahale.metrics.Timer;
+
/**
* A new log appender implementation using grpc bi-directional stream API.
*/
@@ -48,7 +51,7 @@ public class GrpcLogAppender extends LogAppender {
public static final Logger LOG =
LoggerFactory.getLogger(GrpcLogAppender.class);
private final GrpcService rpcService;
- private final Map<Long, AppendEntriesRequestProto> pendingRequests;
+ private final Map<Long, AppendEntriesRequest> pendingRequests;
private final int maxPendingRequestsNum;
private long callId = 0;
private volatile boolean firstResponseReceived = false;
@@ -59,6 +62,8 @@ public class GrpcLogAppender extends LogAppender {
private volatile StreamObserver<AppendEntriesRequestProto>
appendLogRequestObserver;
+ private final GrpcServerMetrics grpcServerMetrics;
+
public GrpcLogAppender(RaftServerImpl server, LeaderState leaderState,
FollowerInfo f) {
super(server, leaderState, f);
@@ -71,6 +76,7 @@ public class GrpcLogAppender extends LogAppender {
pendingRequests = new ConcurrentHashMap<>();
installSnapshotEnabled =
RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(
server.getProxy().getProperties());
+ grpcServerMetrics = new GrpcServerMetrics(server.getMemberId().toString());
}
private GrpcServerProtocolClient getClient() throws IOException {
@@ -160,17 +166,20 @@ public class GrpcLogAppender extends LogAppender {
}
private void appendLog() throws IOException {
- final AppendEntriesRequestProto pending;
+ final AppendEntriesRequest request;
final StreamObserver<AppendEntriesRequestProto> s;
synchronized (this) {
// prepare and enqueue the append request. note changes on follower's
// nextIndex and ops on pendingRequests should always be associated
// together and protected by the lock
- pending = createRequest(callId++);
+ AppendEntriesRequestProto pending = createRequest(callId++);
if (pending == null) {
return;
}
- pendingRequests.put(pending.getServerRequest().getCallId(), pending);
+ grpcServerMetrics.onRequestCreate();
+ request = new AppendEntriesRequest(pending,
+
grpcServerMetrics.getGrpcLogAppenderLatencyTimer(getFollowerId().toString()));
+ pendingRequests.put(pending.getServerRequest().getCallId(), request);
increaseNextIndex(pending);
if (appendLogRequestObserver == null) {
appendLogRequestObserver = getClient().appendEntries(new
AppendLogResponseHandler());
@@ -179,25 +188,26 @@ public class GrpcLogAppender extends LogAppender {
}
if (isAppenderRunning()) {
- sendRequest(pending, s);
+ sendRequest(request, s);
}
}
- private void sendRequest(AppendEntriesRequestProto request,
- StreamObserver<AppendEntriesRequestProto> s) {
+ private void sendRequest(AppendEntriesRequest request,
StreamObserver<AppendEntriesRequestProto> s) {
CodeInjectionForTesting.execute(GrpcService.GRPC_SEND_SERVER_REQUEST,
server.getId(), null, request);
-
- s.onNext(request);
- scheduler.onTimeout(requestTimeoutDuration, () ->
timeoutAppendRequest(request), LOG,
+ AppendEntriesRequestProto requestProto = request.getRequestProto();
+ request.startRequestTimer();
+ s.onNext(requestProto);
+ scheduler.onTimeout(requestTimeoutDuration, () ->
timeoutAppendRequest(requestProto), LOG,
() -> "Timeout check failed for append entry request: " + request);
follower.updateLastRpcSendTime();
}
private void timeoutAppendRequest(AppendEntriesRequestProto request) {
- AppendEntriesRequestProto pendingRequest =
pendingRequests.remove(request.getServerRequest().getCallId());
+ AppendEntriesRequest pendingRequest =
pendingRequests.remove(request.getServerRequest().getCallId());
if (pendingRequest != null) {
- LOG.warn( "{}: appendEntries Timeout, request={}", this,
ServerProtoUtils.toString(pendingRequest));
+ LOG.warn( "{}: appendEntries Timeout, request={}", this,
+ ServerProtoUtils.toString(pendingRequest.getRequestProto()));
}
}
@@ -224,17 +234,19 @@ public class GrpcLogAppender extends LogAppender {
*/
@Override
public void onNext(AppendEntriesReplyProto reply) {
- final AppendEntriesRequestProto request =
pendingRequests.remove(reply.getServerReply().getCallId());
+ final AppendEntriesRequest request =
pendingRequests.remove(reply.getServerReply().getCallId());
+ AppendEntriesRequestProto requestProto = request.getRequestProto();
if (LOG.isDebugEnabled()) {
LOG.debug("{}: received {} reply {}, request={}",
this, firstResponseReceived? "a": "the first",
- ServerProtoUtils.toString(reply),
ServerProtoUtils.toString(request));
+ ServerProtoUtils.toString(reply),
ServerProtoUtils.toString(requestProto));
}
+ request.stopRequestTimer(); // Update completion time
try {
- onNextImpl(request, reply);
+ onNextImpl(requestProto, reply);
} catch(Throwable t) {
- LOG.error("Failed onNext request=" + ServerProtoUtils.toString(request)
+ LOG.error("Failed onNext request=" +
ServerProtoUtils.toString(requestProto)
+ ", reply=" + ServerProtoUtils.toString(reply), t);
}
}
@@ -254,17 +266,20 @@ public class GrpcLogAppender extends LogAppender {
switch (reply.getResult()) {
case SUCCESS:
+ grpcServerMetrics.onRequestSuccess(getFollowerId().toString());
updateCommitIndex(reply.getFollowerCommit());
if (checkAndUpdateMatchIndex(request)) {
submitEventOnSuccessAppend();
}
break;
case NOT_LEADER:
+ grpcServerMetrics.onRequestNotLeader(getFollowerId().toString());
if (checkResponseTerm(reply.getTerm())) {
return;
}
break;
case INCONSISTENCY:
+ grpcServerMetrics.onRequestInconsistency(getFollowerId().toString());
updateNextIndex(reply.getNextIndex());
break;
default:
@@ -283,9 +298,9 @@ public class GrpcLogAppender extends LogAppender {
return;
}
GrpcUtil.warn(LOG, () -> this + ": Failed appendEntries", t);
-
+ grpcServerMetrics.onRequestRetry(); // Update try counter
long callId = GrpcUtil.getCallId(t);
- resetClient(pendingRequests.remove(callId));
+ resetClient(pendingRequests.remove(callId).getRequestProto());
}
@Override
@@ -504,4 +519,27 @@ public class GrpcLogAppender extends LogAppender {
}
return null;
}
+
+ static class AppendEntriesRequest {
+ private final AppendEntriesRequestProto requestProto;
+ private final Timer timer;
+ private Timer.Context timerContext;
+
+ AppendEntriesRequest(AppendEntriesRequestProto requestProto, Timer timer) {
+ this.requestProto = requestProto;
+ this.timer = timer;
+ }
+
+ AppendEntriesRequestProto getRequestProto() {
+ return requestProto;
+ }
+
+ void startRequestTimer() {
+ timerContext = timer.time();
+ }
+
+ void stopRequestTimer() {
+ timerContext.stop();
+ }
+ }
}
diff --git
a/ratis-grpc/src/test/java/org/apache/ratis/grpc/server/TestGrpcServerMetrics.java
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/server/TestGrpcServerMetrics.java
new file mode 100644
index 0000000..f868b31
--- /dev/null
+++
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/server/TestGrpcServerMetrics.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.server;
+
+import static
org.apache.ratis.grpc.metrics.GrpcServerMetrics.RATIS_GRPC_METRICS_LOG_APPENDER_INCONSISTENCY;
+import static
org.apache.ratis.grpc.metrics.GrpcServerMetrics.RATIS_GRPC_METRICS_LOG_APPENDER_LATENCY;
+import static
org.apache.ratis.grpc.metrics.GrpcServerMetrics.RATIS_GRPC_METRICS_LOG_APPENDER_NOT_LEADER;
+import static
org.apache.ratis.grpc.metrics.GrpcServerMetrics.RATIS_GRPC_METRICS_LOG_APPENDER_SUCCESS;
+import static
org.apache.ratis.grpc.metrics.GrpcServerMetrics.RATIS_GRPC_METRICS_REQUESTS_TOTAL;
+import static
org.apache.ratis.grpc.metrics.GrpcServerMetrics.RATIS_GRPC_METRICS_REQUEST_RETRY_COUNT;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.function.Consumer;
+
+import org.apache.ratis.grpc.metrics.GrpcServerMetrics;
+import org.apache.ratis.metrics.RatisMetricRegistry;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.ServerState;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestGrpcServerMetrics {
+ private static GrpcServerMetrics grpcServerMetrics;
+ private static RatisMetricRegistry ratisMetricRegistry;
+ private static RaftGroupId raftGroupId;
+ private static RaftPeerId raftPeerId;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ RaftServerImpl raftServer = mock(RaftServerImpl.class);
+ ServerState serverStateMock = mock(ServerState.class);
+ when(raftServer.getState()).thenReturn(serverStateMock);
+ when(serverStateMock.getLastLeaderElapsedTimeMs()).thenReturn(1000L);
+ raftGroupId = RaftGroupId.randomId();
+ raftPeerId = RaftPeerId.valueOf("TestId");
+ RaftGroupMemberId raftGroupMemberId =
RaftGroupMemberId.valueOf(raftPeerId, raftGroupId);
+ when(raftServer.getMemberId()).thenReturn(raftGroupMemberId);
+ grpcServerMetrics = new GrpcServerMetrics(raftGroupMemberId.toString());
+ ratisMetricRegistry = grpcServerMetrics.getRegistry();
+ }
+
+ @Test
+ public void testGrpcLogAppenderLatencyTimer() throws Exception {
+ RaftProtos.AppendEntriesRequestProto.Builder proto =
RaftProtos.AppendEntriesRequestProto.newBuilder();
+ GrpcLogAppender.AppendEntriesRequest req =
+ new GrpcLogAppender.AppendEntriesRequest(proto.build(),
+
grpcServerMetrics.getGrpcLogAppenderLatencyTimer(raftPeerId.toString()));
+ Assert.assertEquals(0L, ratisMetricRegistry.timer(String.format(
+ RATIS_GRPC_METRICS_LOG_APPENDER_LATENCY,
raftPeerId.toString())).getSnapshot().getMax());
+ req.startRequestTimer();
+ Thread.sleep(1000L);
+ req.stopRequestTimer();
+ Assert.assertTrue(ratisMetricRegistry.timer(String.format(
+ RATIS_GRPC_METRICS_LOG_APPENDER_LATENCY,
raftPeerId.toString())).getSnapshot().getMax() > 1000L);
+ }
+
+ @Test
+ public void testGrpcLogRequestTotal() {
+ Assert.assertEquals(0L,
ratisMetricRegistry.counter(RATIS_GRPC_METRICS_REQUESTS_TOTAL).getCount());
+ grpcServerMetrics.onRequestCreate();
+ Assert.assertEquals(1L,
ratisMetricRegistry.counter(RATIS_GRPC_METRICS_REQUESTS_TOTAL).getCount());
+ }
+
+ @Test
+ public void testGrpcLogRequestRetry() {
+ Assert.assertEquals(0L,
ratisMetricRegistry.counter(RATIS_GRPC_METRICS_REQUEST_RETRY_COUNT).getCount());
+ grpcServerMetrics.onRequestRetry();
+ Assert.assertEquals(1L,
ratisMetricRegistry.counter(RATIS_GRPC_METRICS_REQUEST_RETRY_COUNT).getCount());
+ }
+
+ @Test
+ public void testGrpcLogAppenderRequestCounters() {
+ assertCounterIncremented(RATIS_GRPC_METRICS_LOG_APPENDER_SUCCESS,
grpcServerMetrics::onRequestSuccess);
+ assertCounterIncremented(RATIS_GRPC_METRICS_LOG_APPENDER_NOT_LEADER,
grpcServerMetrics::onRequestNotLeader);
+ assertCounterIncremented(RATIS_GRPC_METRICS_LOG_APPENDER_INCONSISTENCY,
grpcServerMetrics::onRequestInconsistency);
+ }
+
+ private void assertCounterIncremented(String counterVar, Consumer<String>
incFunction) {
+ String counter = String.format(counterVar, raftPeerId.toString());
+ Assert.assertEquals(0L, ratisMetricRegistry.counter(counter).getCount());
+ incFunction.accept(raftPeerId.toString());
+ Assert.assertEquals(1L, ratisMetricRegistry.counter(counter).getCount());
+ }
+}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 0a1fd49..3b724c6 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -18,6 +18,7 @@
package org.apache.ratis.server.impl;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.proto.RaftProtos.*;
import org.apache.ratis.protocol.*;
import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
@@ -25,6 +26,8 @@ import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerMXBean;
import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.server.metrics.LeaderElectionMetrics;
+import org.apache.ratis.server.metrics.RatisMetricNames;
+import org.apache.ratis.server.metrics.RatisMetrics;
import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
import org.apache.ratis.server.protocol.RaftServerProtocol;
import org.apache.ratis.server.protocol.TermIndex;
@@ -61,6 +64,8 @@ import static org.apache.ratis.util.LifeCycle.State.NEW;
import static org.apache.ratis.util.LifeCycle.State.RUNNING;
import static org.apache.ratis.util.LifeCycle.State.STARTING;
+import com.codahale.metrics.Timer;
+
public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronousProtocol,
RaftClientProtocol, RaftClientAsynchronousProtocol {
public static final Logger LOG =
LoggerFactory.getLogger(RaftServerImpl.class);
@@ -88,6 +93,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
private final RaftServerJmxAdapter jmxAdapter;
private final LeaderElectionMetrics leaderElectionMetricsRegistry;
+ private final RatisMetricRegistry raftServerMetricsRegistry;
private AtomicReference<TermIndex> inProgressInstallSnapshotRequest;
@@ -116,6 +122,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
this.jmxAdapter = new RaftServerJmxAdapter();
this.leaderElectionMetricsRegistry = getLeaderElectionMetrics(this);
+ this.raftServerMetricsRegistry =
RatisMetrics.getMetricsRegistryForServer(id.toString());
}
private RetryCache initRetryCache(RaftProperties prop) {
@@ -911,6 +918,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
final long currentTerm;
final long followerCommit = state.getLog().getLastCommittedIndex();
final Optional<FollowerState> followerState;
+ Timer.Context timer =
raftServerMetricsRegistry.timer(RatisMetricNames.FOLLOWER_APPEND_ENTRIES_LATENCY).time();
synchronized (this) {
final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
currentTerm = state.getCurrentTerm();
@@ -971,6 +979,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
}
logAppendEntries(isHeartbeat, () ->
getMemberId() + ": succeeded to handle AppendEntries. Reply: " +
ServerProtoUtils.toString(reply));
+ timer.stop(); // TODO: future never completes exceptionally?
return reply;
});
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java
index 7bc1033..57f7d9a 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java
@@ -93,4 +93,5 @@ public final class RatisMetricNames {
// Time required to load and process raft log segments during restart
public static final String RAFT_LOG_LOAD_SEGMENT_LATENCY =
"segmentLoadLatency";
+ public static final String FOLLOWER_APPEND_ENTRIES_LATENCY =
"follower_append_entry_latency";
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
index 384b85f..2877351 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
@@ -37,6 +37,8 @@ public class RatisMetrics {
public static final String RATIS_LEADER_METRICS_DESC = "Metrics for Ratis
Leader.";
public static final String RATIS_STATEMACHINE_METRICS =
"ratis_state_machine";
public static final String RATIS_STATEMACHINE_METRICS_DESC = "Metrics for
State Machine Updater";
+ public static final String RATIS_SERVER_METRICS = "server";
+ public static final String RATIS_SERVER_METRICS_DESC = "Metrics for Raft
server";
static MetricsReporting metricsReporting = new MetricsReporting(500,
TimeUnit.MILLISECONDS);
@@ -88,4 +90,8 @@ public class RatisMetrics {
return ratisMetricRegistry.orElse(null);
}
+ public static RatisMetricRegistry getMetricsRegistryForServer(String
serverId) {
+ return create(new MetricRegistryInfo(serverId,
RATIS_APPLICATION_NAME_METRICS, RATIS_SERVER_METRICS,
+ RATIS_SERVER_METRICS_DESC));
+ }
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/metrics/TestLeaderElectionMetrics.java
b/ratis-server/src/test/java/org/apache/ratis/server/metrics/TestLeaderElectionMetrics.java
index b17348e..44750a1 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/metrics/TestLeaderElectionMetrics.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/metrics/TestLeaderElectionMetrics.java
@@ -35,8 +35,6 @@ import org.apache.ratis.server.impl.ServerState;
import org.junit.BeforeClass;
import org.junit.Test;
-import com.google.protobuf.ByteString;
-
/**
* Test for LeaderElectionMetrics.
*/