This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 30bafb7 RATIS-651. Add metrics related to leaderElection and
HeartBeat. Contributed by Aravindan Vijayan.
30bafb7 is described below
commit 30bafb7a7fc91febfc3bf5b08a97f06d7f4ef810
Author: Shashikant Banerjee <[email protected]>
AuthorDate: Wed Aug 28 10:30:30 2019 +0530
RATIS-651. Add metrics related to leaderElection and HeartBeat. Contributed
by Aravindan Vijayan.
---
.../apache/ratis/server/impl/FollowerState.java | 1 +
.../apache/ratis/server/impl/LeaderElection.java | 3 +
.../org/apache/ratis/server/impl/LeaderState.java | 19 ++++-
.../org/apache/ratis/server/impl/LogAppender.java | 2 +
.../apache/ratis/server/impl/RaftServerImpl.java | 9 +++
.../org/apache/ratis/server/impl/ServerState.java | 2 +-
.../ratis/server/metrics/HeartbeatMetrics.java | 63 +++++++++++++++
.../server/metrics/LeaderElectionMetrics.java | 58 ++++++++++++++
.../ratis/server/metrics/RatisMetricNames.java | 33 ++++++++
.../apache/ratis/server/metrics/RatisMetrics.java | 15 ++++
.../java/org/apache/ratis/LogAppenderTests.java | 51 +++++++++++++
.../ratis/server/impl/LeaderElectionTests.java | 31 ++++++++
.../server/metrics/TestLeaderElectionMetrics.java | 89 ++++++++++++++++++++++
.../ratis/TestRaftServerSlownessDetection.java | 21 ++++-
14 files changed, 393 insertions(+), 4 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
index bbb7436..c20b1a8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
@@ -105,6 +105,7 @@ class FollowerState extends Daemon {
if (outstandingOp.get() == 0 && lastRpcTime.elapsedTimeMs() >=
electionTimeout) {
LOG.info("{}:{} changes to CANDIDATE, lastRpcTime:{},
electionTimeout:{}ms",
server.getId(), server.getGroupId(),
lastRpcTime.elapsedTimeMs(), electionTimeout);
+
server.getLeaderElectionMetricsRegistry().onLeaderElectionTimeout(); // Update
timeout metric counters.
// election timeout, should become a candidate
server.changeToCandidate();
break;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index 8a8dc07..3c81348 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -128,6 +128,7 @@ class LeaderElection implements Runnable {
@Override
public void run() {
+ Timestamp electionStartTime = Timestamp.currentTime();
try {
askForVotes();
} catch(Throwable e) {
@@ -149,6 +150,8 @@ class LeaderElection implements Runnable {
shutdown();
}
} finally {
+ // Update leader election completion metric(s).
+
server.getLeaderElectionMetricsRegistry().onLeaderElectionCompletion(electionStartTime.elapsedTimeMs());
lifeCycle.transition(LifeCycle.State.CLOSED);
}
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index e61c7e4..8750d29 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -21,6 +21,7 @@ import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.protocol.*;
import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.metrics.HeartbeatMetrics;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
@@ -193,6 +194,7 @@ public class LeaderState {
private final int stagingCatchupGap;
private final TimeDuration syncInterval;
private final long placeHolderIndex;
+ private final HeartbeatMetrics heartbeatMetrics;
LeaderState(RaftServerImpl server, RaftProperties properties) {
this.server = server;
@@ -212,6 +214,7 @@ public class LeaderState {
placeHolderIndex = raftLog.getNextIndex();
senders = new SenderList();
+ heartbeatMetrics = HeartbeatMetrics.getHeartbeatMetrics(server);
addSenders(others, placeHolderIndex, true);
voterLists = divideFollowers(conf);
}
@@ -375,8 +378,11 @@ public class LeaderState {
Collection<LogAppender> addSenders(Collection<RaftPeer> newPeers, long
nextIndex, boolean attendVote) {
final Timestamp t =
Timestamp.currentTime().addTimeMs(-server.getMaxTimeoutMs());
final List<LogAppender> newAppenders = newPeers.stream()
- .map(peer -> server.newLogAppender(this, peer, t, nextIndex,
attendVote))
- .collect(Collectors.toList());
+ .map(peer -> {
+ LogAppender logAppender = server.newLogAppender(this, peer, t,
nextIndex, attendVote);
+
heartbeatMetrics.addFollower(logAppender.getFollower().getPeer().getId().toString());
+ return logAppender;
+ }).collect(Collectors.toList());
senders.addAll(newAppenders);
return newAppenders;
}
@@ -772,4 +778,13 @@ public class LeaderState {
Stream<LogAppender> getLogAppenders() {
return senders.stream();
}
+
+ /**
+ * Record Follower Heartbeat Elapsed Time.
+ * @param followerId Follower Peer ID.
+ * @param elapsedTime Elapsed time in Nanos.
+ */
+ void recordFollowerHeartbeatElapsedTime(String followerId, long elapsedTime)
{
+ heartbeatMetrics.recordFollowerHeartbeatElapsedTime(followerId,
elapsedTime);
+ }
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index f7b94a4..5768bc4 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -527,6 +527,8 @@ public class LogAppender {
if (follower.isSlow()) {
server.getStateMachine().notifySlowness(server.getRoleInfoProto());
}
+
leaderState.recordFollowerHeartbeatElapsedTime(follower.getPeer().getId().toString(),
+ follower.getLastRpcResponseTime().elapsedTime().getDuration());
}
public synchronized void notifyAppend() {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 35daf5e..8395b06 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -24,6 +24,7 @@ import
org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerMXBean;
import org.apache.ratis.server.RaftServerRpc;
+import org.apache.ratis.server.metrics.LeaderElectionMetrics;
import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
import org.apache.ratis.server.protocol.RaftServerProtocol;
import org.apache.ratis.server.protocol.TermIndex;
@@ -54,6 +55,7 @@ import java.util.stream.Collectors;
import static
org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.INCONSISTENCY;
import static
org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.NOT_LEADER;
import static
org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.SUCCESS;
+import static
org.apache.ratis.server.metrics.LeaderElectionMetrics.getLeaderElectionMetrics;
import static org.apache.ratis.util.LifeCycle.State.NEW;
import static org.apache.ratis.util.LifeCycle.State.RUNNING;
import static org.apache.ratis.util.LifeCycle.State.STARTING;
@@ -84,6 +86,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
private final CommitInfoCache commitInfoCache = new CommitInfoCache();
private final RaftServerJmxAdapter jmxAdapter;
+ private final LeaderElectionMetrics leaderElectionMetricsRegistry;
private AtomicReference<TermIndex> inProgressInstallSnapshotRequest;
@@ -109,6 +112,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
this.inProgressInstallSnapshotRequest = new AtomicReference<>(null);
this.jmxAdapter = new RaftServerJmxAdapter();
+ this.leaderElectionMetricsRegistry = getLeaderElectionMetrics(this);
}
private RetryCache initRetryCache(RaftProperties prop) {
@@ -411,6 +415,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
}
// start election
role.startLeaderElection(this);
+ leaderElectionMetricsRegistry.onNewLeaderElection();
}
@Override
@@ -1302,6 +1307,10 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
}
}
+ public LeaderElectionMetrics getLeaderElectionMetricsRegistry() {
+ return leaderElectionMetricsRegistry;
+ }
+
private class RaftServerJmxAdapter extends JmxRegister implements
RaftServerMXBean {
@Override
public String getId() {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 05cda84..6805ae4 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -263,7 +263,7 @@ public class ServerState implements Closeable {
.isPresent();
}
- long getLastLeaderElapsedTimeMs() {
+ public long getLastLeaderElapsedTimeMs() {
final Timestamp t = lastNoLeaderTime;
return t == null ? 0 : t.elapsedTimeMs();
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/HeartbeatMetrics.java
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/HeartbeatMetrics.java
new file mode 100644
index 0000000..9a8c4bd
--- /dev/null
+++
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/HeartbeatMetrics.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.server.metrics;
+
+import static
org.apache.ratis.server.metrics.RatisMetricNames.FOLLOWER_LAST_HEARTBEAT_ELAPSED_TIME_METRIC;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.ratis.metrics.RatisMetricRegistry;
+import org.apache.ratis.server.impl.RaftServerImpl;
+
+/**
+ * Metric Registry for Heartbeat. One instance per leader per group.
+ */
+public final class HeartbeatMetrics {
+
+ private RatisMetricRegistry registry = null;
+ private Map<String, Long> followerLastHeartbeatElapsedTimeMap = new
HashMap<>();
+
+ public static HeartbeatMetrics getHeartbeatMetrics(RaftServerImpl
raftServer) {
+ return new HeartbeatMetrics(raftServer.getMemberId().toString());
+ }
+
+ private HeartbeatMetrics(String serverId) {
+ registry = RatisMetrics.getMetricRegistryForHeartbeat(serverId);
+ }
+
+ /**
+ * Register a follower with this Heartbeat Metrics registry instance.
+ * @param followerName Name of the follower.
+ */
+ public void addFollower(String followerName) {
+ String followerMetricKey =
String.format(FOLLOWER_LAST_HEARTBEAT_ELAPSED_TIME_METRIC, followerName);
+ followerLastHeartbeatElapsedTimeMap.put(followerName, 0L);
+ registry.gauge(followerMetricKey, () -> () ->
followerLastHeartbeatElapsedTimeMap.get(followerName));
+ }
+
+ /**
+ * Record heartbeat elapsed time for a follower within a Raft group.
+ * @param followerName Name of the follower.
+ * @param elapsedTime Elapsed time in Nanos.
+ */
+ public void recordFollowerHeartbeatElapsedTime(String followerName, long
elapsedTime) {
+ followerLastHeartbeatElapsedTimeMap.put(followerName, elapsedTime);
+ }
+}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/LeaderElectionMetrics.java
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/LeaderElectionMetrics.java
new file mode 100644
index 0000000..b6b1f65
--- /dev/null
+++
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/LeaderElectionMetrics.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.server.metrics;
+
+import static
org.apache.ratis.server.metrics.RatisMetricNames.LAST_LEADER_ELAPSED_TIME;
+import static
org.apache.ratis.server.metrics.RatisMetricNames.LEADER_ELECTION_COUNT_METRIC;
+import static
org.apache.ratis.server.metrics.RatisMetricNames.LEADER_ELECTION_LATENCY;
+import static
org.apache.ratis.server.metrics.RatisMetricNames.LEADER_ELECTION_TIMEOUT_COUNT_METRIC;
+
+import org.apache.ratis.metrics.RatisMetricRegistry;
+import org.apache.ratis.server.impl.RaftServerImpl;
+
+/**
+ * Class to update the metrics related to Leader Election.
+ */
+public final class LeaderElectionMetrics {
+
+ private long leaderElectionCompletionLatency = 0L;
+ private RatisMetricRegistry ratisMetricRegistry;
+
+ private LeaderElectionMetrics(RaftServerImpl raftServer) {
+ this.ratisMetricRegistry =
RatisMetrics.getMetricRegistryForLeaderElection(raftServer.getMemberId().toString());
+ ratisMetricRegistry.gauge(LEADER_ELECTION_LATENCY, () -> () ->
leaderElectionCompletionLatency);
+ ratisMetricRegistry.gauge(LAST_LEADER_ELAPSED_TIME, () -> () ->
raftServer.getState().getLastLeaderElapsedTimeMs());
+ }
+
+ public static LeaderElectionMetrics getLeaderElectionMetrics(RaftServerImpl
raftServer) {
+ return new LeaderElectionMetrics(raftServer);
+ }
+
+ public void onNewLeaderElection() {
+ ratisMetricRegistry.counter(LEADER_ELECTION_COUNT_METRIC).inc();
+ }
+
+ public void onLeaderElectionCompletion(long elapsedTime) {
+ this.leaderElectionCompletionLatency = elapsedTime;
+ }
+
+ public void onLeaderElectionTimeout() {
+ ratisMetricRegistry.counter(LEADER_ELECTION_TIMEOUT_COUNT_METRIC).inc();
+ }
+}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java
new file mode 100644
index 0000000..08933b3
--- /dev/null
+++
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.server.metrics;
+
+public final class RatisMetricNames {
+
+ private RatisMetricNames() {
+ }
+
+ public static final String LEADER_ELECTION_COUNT_METRIC =
"leader_election_count";
+ public static final String LEADER_ELECTION_TIMEOUT_COUNT_METRIC =
"leader_election_timeout_count";
+ public static final String LEADER_ELECTION_LATENCY =
"leader_election_latency";
+ public static final String LAST_LEADER_ELAPSED_TIME =
"last_leader_elapsed_time";
+
+ public static final String FOLLOWER_LAST_HEARTBEAT_ELAPSED_TIME_METRIC =
"follower_%s_last_heartbeat_elapsed_time";
+
+}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
index 16cb1d1..4230dab 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
@@ -31,6 +31,11 @@ public class RatisMetrics {
public final static String RATIS_LOG_WORKER_METRICS_DESC = "Ratis metrics";
public final static String RATIS_LOG_WORKER_METRICS = "ratis_log_worker";
public final static String RATIS_APPLICATION_NAME_METRICS = "ratis_core";
+ public static final String RATIS_LEADER_ELECTION_METRICS = "leader_election";
+ public static final String RATIS_LEADER_ELECTION_METRICS_DESC = "Metrics for
Ratis Leader Election.";
+ public static final String RATIS_HEARTBEAT_METRICS = "heartbeat";
+ public static final String RATIS_HEARTBEAT_METRICS_DESC = "Metrics for Ratis
Heartbeat.";
+
static MetricsReporting metricsReporting = new MetricsReporting(500,
TimeUnit.MILLISECONDS);
public static RatisMetricRegistry createMetricRegistryForLogWorker(String
name) {
@@ -60,4 +65,14 @@ public class RatisMetrics {
return registry;
}
+
+ public static RatisMetricRegistry getMetricRegistryForLeaderElection(String
serverId) {
+ return create(new MetricRegistryInfo(serverId,
RATIS_APPLICATION_NAME_METRICS, RATIS_LEADER_ELECTION_METRICS,
+ RATIS_LEADER_ELECTION_METRICS_DESC));
+ }
+
+ public static RatisMetricRegistry getMetricRegistryForHeartbeat(String
serverId) {
+ return create(new MetricRegistryInfo(serverId,
RATIS_APPLICATION_NAME_METRICS, RATIS_HEARTBEAT_METRICS,
+ RATIS_HEARTBEAT_METRICS_DESC));
+ }
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
index 6d92bbe..2bc73f2 100644
--- a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
@@ -17,17 +17,23 @@
*/
package org.apache.ratis;
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
+import static org.junit.Assert.assertTrue;
+
import org.apache.log4j.Level;
import org.apache.ratis.RaftTestUtil.SimpleMessage;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.LogAppender;
+import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.impl.ServerState;
+import org.apache.ratis.server.metrics.RatisMetrics;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
@@ -36,9 +42,11 @@ import org.apache.ratis.util.SizeInBytes;
import org.junit.Assert;
import org.junit.Test;
+import java.io.IOException;
import java.util.Arrays;
import java.util.EnumMap;
import java.util.List;
+import java.util.SortedMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -46,6 +54,8 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import com.codahale.metrics.Gauge;
+
public abstract class LogAppenderTests<CLUSTER extends MiniRaftCluster>
extends BaseTest
implements MiniRaftCluster.Factory.Get<CLUSTER> {
@@ -114,6 +124,47 @@ public abstract class LogAppenderTests<CLUSTER extends
MiniRaftCluster>
runWithNewCluster(3, this::runTest);
}
+ @Test
+ public void testFollowerHeartbeatMetric() throws IOException,
InterruptedException {
+
+ // Start a 3 node Ratis ring.
+ final MiniRaftCluster cluster = newCluster(3);
+ cluster.start();
+ RaftServerImpl leaderServer = waitForLeader(cluster);
+
+ // Write 10 messages to leader.
+ try(RaftClient client = cluster.createClient(leaderServer.getId())) {
+ for (int i = 1; i <= 10; i++) {
+ client.send(new RaftTestUtil.SimpleMessage("Msg to make leader ready "
+ i));
+ }
+ } catch (IOException e) {
+ throw e;
+ }
+
+ RatisMetricRegistry ratisMetricRegistry =
RatisMetrics.getMetricRegistryForHeartbeat(
+ leaderServer.getMemberId().toString());
+
+ // Get all last_heartbeat_elapsed_time metric gauges. Should be equal to
number of followers.
+ SortedMap<String, Gauge> heartbeatElapsedTimeGauges =
ratisMetricRegistry.getGauges((s, metric) ->
+ s.contains("last_heartbeat_elapsed_time"));
+ assertTrue(heartbeatElapsedTimeGauges.size() == 2);
+
+ for (RaftServerImpl followerServer : cluster.getFollowers()) {
+ String followerId = followerServer.getId().toString();
+ Gauge metric =
heartbeatElapsedTimeGauges.entrySet().parallelStream().filter(e ->
e.getKey().contains(
+ followerId)).iterator().next().getValue();
+ // Metric for this follower exists.
+ assertTrue(metric != null);
+ // Metric in nanos > 0.
+ assertTrue((long)metric.getValue() > 0);
+ // Try to get Heartbeat metrics for follower.
+ RatisMetricRegistry followerMetricsRegistry =
RatisMetrics.getMetricRegistryForHeartbeat(followerServer
+ .getMemberId().toString());
+ // Metric should not exist. It only exists in leader.
+ assertTrue(followerMetricsRegistry.getGauges((s, m) ->
s.contains("last_heartbeat_elapsed_time")).isEmpty());
+ }
+ }
+
void runTest(CLUSTER cluster) throws Exception {
final int numMsgs = 10;
final int numClients = 5;
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index abcd403..3818b41 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -22,22 +22,30 @@ import org.apache.ratis.BaseTest;
import org.apache.ratis.MiniRaftCluster;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.metrics.RatisMetrics;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils;
import org.apache.ratis.util.ExitUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LogUtils;
import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.Timestamp;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
+import java.io.IOException;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import static org.apache.ratis.RaftTestUtil.waitForLeader;
+import static
org.apache.ratis.server.metrics.RatisMetricNames.LEADER_ELECTION_COUNT_METRIC;
+import static
org.apache.ratis.server.metrics.RatisMetricNames.LEADER_ELECTION_LATENCY;
+import static
org.apache.ratis.server.metrics.RatisMetricNames.LEADER_ELECTION_TIMEOUT_COUNT_METRIC;
+import static org.junit.Assert.assertTrue;
public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
extends BaseTest
@@ -136,4 +144,27 @@ public abstract class LeaderElectionTests<CLUSTER extends
MiniRaftCluster>
LOG.info(cluster.printServers());
Assert.assertEquals(leader.getId(), lastServerLeaderId);
}
+
+ @Test
+ public void testLeaderElectionMetrics() throws IOException,
InterruptedException {
+ LOG.info("Running testLeaderElectionMetrics");
+ Timestamp timestamp = Timestamp.currentTime();
+ final MiniRaftCluster cluster = newCluster(3);
+ cluster.start();
+ RaftServerImpl leaderServer = waitForLeader(cluster);
+
+ RatisMetricRegistry ratisMetricRegistry =
RatisMetrics.getMetricRegistryForLeaderElection(leaderServer
+ .getMemberId().toString());
+
+ // Verify each metric individually.
+ long numLeaderElections =
ratisMetricRegistry.counter(LEADER_ELECTION_COUNT_METRIC).getCount();
+ assertTrue(numLeaderElections > 0);
+
+ long numLeaderElectionTimeout =
ratisMetricRegistry.counter(LEADER_ELECTION_TIMEOUT_COUNT_METRIC).getCount();
+ assertTrue(numLeaderElectionTimeout > 0);
+
+ Long leaderElectionLatency = (Long) ratisMetricRegistry.getGauges((s,
metric) ->
+
s.contains(LEADER_ELECTION_LATENCY)).values().iterator().next().getValue();
+ assertTrue(leaderElectionLatency > 0 && leaderElectionLatency <
timestamp.elapsedTimeMs());
+ }
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/metrics/TestLeaderElectionMetrics.java
b/ratis-server/src/test/java/org/apache/ratis/server/metrics/TestLeaderElectionMetrics.java
new file mode 100644
index 0000000..b17348e
--- /dev/null
+++
b/ratis-server/src/test/java/org/apache/ratis/server/metrics/TestLeaderElectionMetrics.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.server.metrics;
+
+import static
org.apache.ratis.server.metrics.RatisMetricNames.LEADER_ELECTION_COUNT_METRIC;
+import static
org.apache.ratis.server.metrics.RatisMetricNames.LEADER_ELECTION_LATENCY;
+import static
org.apache.ratis.server.metrics.RatisMetricNames.LEADER_ELECTION_TIMEOUT_COUNT_METRIC;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.ratis.metrics.RatisMetricRegistry;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.ServerState;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.protobuf.ByteString;
+
+/**
+ * Test for LeaderElectionMetrics.
+ */
+public class TestLeaderElectionMetrics {
+
+ private static LeaderElectionMetrics leaderElectionMetrics;
+ private static RatisMetricRegistry ratisMetricRegistry;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ RaftServerImpl raftServer = mock(RaftServerImpl.class);
+ ServerState serverStateMock = mock(ServerState.class);
+ when(raftServer.getState()).thenReturn(serverStateMock);
+ when(serverStateMock.getLastLeaderElapsedTimeMs()).thenReturn(1000L);
+ RaftGroupId raftGroupId = RaftGroupId.randomId();
+ RaftPeerId raftPeerId = RaftPeerId.valueOf("TestId");
+ RaftGroupMemberId raftGroupMemberId =
RaftGroupMemberId.valueOf(raftPeerId, raftGroupId);
+ when(raftServer.getMemberId()).thenReturn(raftGroupMemberId);
+ leaderElectionMetrics =
LeaderElectionMetrics.getLeaderElectionMetrics(raftServer);
+ ratisMetricRegistry =
RatisMetrics.getMetricRegistryForLeaderElection(raftServer.getMemberId().toString());
+ }
+
+ @Test
+ public void testOnNewLeaderElection() throws Exception {
+ long numLeaderElections = ratisMetricRegistry.counter(
+ LEADER_ELECTION_COUNT_METRIC).getCount();
+ assertTrue(numLeaderElections == 0);
+ leaderElectionMetrics.onNewLeaderElection();
+ numLeaderElections =
ratisMetricRegistry.counter(LEADER_ELECTION_COUNT_METRIC).getCount();
+ assertEquals(1, numLeaderElections);
+ }
+
+ @Test
+ public void testOnLeaderElectionCompletion() throws Exception {
+ leaderElectionMetrics.onLeaderElectionCompletion(500L);
+ Long leaderElectionLatency = (Long) ratisMetricRegistry.getGauges((s,
metric) ->
+
s.contains(LEADER_ELECTION_LATENCY)).values().iterator().next().getValue();
+ assertEquals(500L, leaderElectionLatency.longValue());
+ }
+
+ @Test
+ public void testOnLeaderElectionTimeout() throws Exception {
+ long numLeaderElectionTimeouts = ratisMetricRegistry.counter(
+ LEADER_ELECTION_TIMEOUT_COUNT_METRIC).getCount();
+ assertTrue(numLeaderElectionTimeouts == 0);
+ leaderElectionMetrics.onLeaderElectionTimeout();
+ numLeaderElectionTimeouts =
ratisMetricRegistry.counter(LEADER_ELECTION_TIMEOUT_COUNT_METRIC).getCount();
+ assertEquals(1, numLeaderElectionTimeouts);
+ }
+}
\ No newline at end of file
diff --git
a/ratis-test/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
b/ratis-test/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
index eb6d562..63d9da6 100644
---
a/ratis-test/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
+++
b/ratis-test/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
@@ -19,9 +19,11 @@ package org.apache.ratis;
import org.apache.log4j.Level;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.metrics.RatisMetrics;
import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
@@ -37,8 +39,11 @@ import org.junit.Test;
import java.io.IOException;
import java.util.List;
+import java.util.SortedMap;
import java.util.concurrent.TimeUnit;
+import com.codahale.metrics.Gauge;
+
/**
* Test Raft Server Slowness detection and notification to Leader's
statemachine.
*/
@@ -79,15 +84,29 @@ public class TestRaftServerSlownessDetection extends
BaseTest {
@Test
public void testSlownessDetection() throws Exception {
- RaftTestUtil.waitForLeader(cluster);
+ RaftServerImpl leaderServer = RaftTestUtil.waitForLeader(cluster);
long slownessTimeout = RaftServerConfigKeys.Rpc
.slownessTimeout(cluster.getProperties()).toIntExact(TimeUnit.MILLISECONDS);
RaftServerImpl failedFollower = cluster.getFollowers().get(0);
+ RatisMetricRegistry ratisMetricRegistry =
RatisMetrics.getMetricRegistryForHeartbeat(
+ leaderServer.getMemberId().toString());
+ SortedMap<String, Gauge> heartbeatElapsedTimeGauges =
ratisMetricRegistry.getGauges((s, metric) ->
+ s.contains("last_heartbeat_elapsed_time"));
+
+ String followerId = failedFollower.getId().toString();
+ Gauge metric =
heartbeatElapsedTimeGauges.entrySet().parallelStream().filter(e ->
e.getKey().contains(
+ followerId)).iterator().next().getValue();
+
+ long followerHeartBeatElapsedMetric = (long) metric.getValue();
+
// fail the node and wait for the callback to be triggered
cluster.killServer(failedFollower.getId());
Thread.sleep( slownessTimeout * 2);
+ long followerHeartBeatElapsedMetricNew = (long) metric.getValue();
+ Assert.assertTrue(followerHeartBeatElapsedMetricNew >
followerHeartBeatElapsedMetric);
+
// Followers should not get any failed not notification
for (RaftServerImpl followerServer : cluster.getFollowers()) {
Assert.assertNull(SimpleStateMachine4Testing.get(followerServer).getSlownessInfo());