This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new ced7dbf RATIS-649. Add metrics related to ClientRequests. Contributed
by Aravindan Vijayan.
ced7dbf is described below
commit ced7dbfea404523ebc6b415d2e42979c2ac6b351
Author: Shashikant Banerjee <[email protected]>
AuthorDate: Fri Oct 18 16:25:22 2019 +0530
RATIS-649. Add metrics related to ClientRequests. Contributed by Aravindan
Vijayan.
---
.../apache/ratis/protocol/RaftClientRequest.java | 2 +-
.../apache/ratis/server/impl/FollowerState.java | 2 +-
.../org/apache/ratis/server/impl/LeaderState.java | 4 +-
.../apache/ratis/server/impl/PendingRequests.java | 10 +-
.../apache/ratis/server/impl/RaftServerImpl.java | 104 ++++++++++++---------
.../ratis/server/impl/RaftServerMetrics.java | 51 +++++++++-
.../ratis/server/metrics/RatisMetricNames.java | 15 +++
.../apache/ratis/server/metrics/RatisMetrics.java | 5 -
.../apache/ratis/grpc/TestRaftServerWithGrpc.java | 43 ++++++++-
9 files changed, 174 insertions(+), 62 deletions(-)
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index ae4e9ae..4c10c0c 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -127,7 +127,7 @@ public class RaftClientRequest extends RaftClientMessage {
return (WatchRequestTypeProto)proto;
}
- static String toString(ReplicationLevel replication) {
+ public static String toString(ReplicationLevel replication) {
return replication == ReplicationLevel.MAJORITY? "": "-" + replication;
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
index 5f5bddb..fea78b7 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
@@ -62,7 +62,7 @@ class FollowerState extends Daemon {
FollowerState(RaftServerImpl server) {
this.name = server.getMemberId() + "-" + getClass().getSimpleName();
this.server = server;
- raftServerMetrics = RaftServerMetrics.getRaftServerMetrics(server);
+ raftServerMetrics = server.getRaftServerMetrics();
raftServerMetrics.addPeerCommitIndexGauge(server.getPeer());
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 5886124..8f3e54b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -215,7 +215,8 @@ public class LeaderState {
this.eventQueue = new EventQueue();
processor = new EventProcessor();
- this.pendingRequests = new PendingRequests(server.getMemberId(),
properties);
+ raftServerMetrics = server.getRaftServerMetrics();
+ this.pendingRequests = new PendingRequests(server.getMemberId(),
properties, raftServerMetrics);
this.watchRequests = new WatchRequests(server.getMemberId(), properties);
final RaftConfiguration conf = server.getRaftConf();
@@ -223,7 +224,6 @@ public class LeaderState {
placeHolderIndex = raftLog.getNextIndex();
senders = new SenderList();
- raftServerMetrics = RaftServerMetrics.getRaftServerMetrics(server);
addSenders(others, placeHolderIndex, true);
voterLists = divideFollowers(conf);
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index 7fa03b8..b37b4f0 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -51,21 +51,25 @@ class PendingRequests {
private static class RequestMap {
private final Object name;
private final ConcurrentMap<Long, PendingRequest> map = new
ConcurrentHashMap<>();
+ private final RaftServerMetrics raftServerMetrics;
/** Permits to put new requests, always synchronized. */
private final Map<Permit, Permit> permits = new HashMap<>();
/** Track and limit the number of requests. */
private final ResourceSemaphore resource;
- RequestMap(Object name, int capacity) {
+ RequestMap(Object name, int capacity, RaftServerMetrics raftServerMetrics)
{
this.name = name;
this.resource = new ResourceSemaphore(capacity);
+ this.raftServerMetrics = raftServerMetrics;
+ raftServerMetrics.addNumPendingRequestsGauge(resource, capacity);
}
Permit tryAcquire() {
final boolean acquired = resource.tryAcquire();
LOG.trace("tryAcquire? {}", acquired);
if (!acquired) {
+ raftServerMetrics.onRequestQueueLimitHit();
return null;
}
return putPermit();
@@ -135,9 +139,9 @@ class PendingRequests {
private final String name;
private final RequestMap pendingRequests;
- PendingRequests(RaftGroupMemberId id, RaftProperties properties) {
+ PendingRequests(RaftGroupMemberId id, RaftProperties properties,
RaftServerMetrics raftServerMetrics) {
this.name = id + "-" + getClass().getSimpleName();
- this.pendingRequests = new RequestMap(id,
RaftServerConfigKeys.Write.elementLimit(properties));
+ this.pendingRequests = new RequestMap(id,
RaftServerConfigKeys.Write.elementLimit(properties), raftServerMetrics);
}
Permit tryAcquire() {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 443f23d..5434698 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -18,7 +18,6 @@
package org.apache.ratis.server.impl;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.proto.RaftProtos.*;
import org.apache.ratis.protocol.*;
import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
@@ -26,8 +25,6 @@ import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerMXBean;
import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.server.metrics.LeaderElectionMetrics;
-import org.apache.ratis.server.metrics.RatisMetricNames;
-import org.apache.ratis.server.metrics.RatisMetrics;
import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
import org.apache.ratis.server.protocol.RaftServerProtocol;
import org.apache.ratis.server.protocol.TermIndex;
@@ -92,7 +89,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
private final RaftServerJmxAdapter jmxAdapter;
private final LeaderElectionMetrics leaderElectionMetricsRegistry;
- private final RatisMetricRegistry raftServerMetricsRegistry;
+ private final RaftServerMetrics raftServerMetrics;
private AtomicReference<TermIndex> inProgressInstallSnapshotRequest;
@@ -119,7 +116,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
this.jmxAdapter = new RaftServerJmxAdapter();
this.leaderElectionMetricsRegistry = getLeaderElectionMetrics(this);
- this.raftServerMetricsRegistry =
RatisMetrics.getMetricsRegistryForServer(id.toString());
+ this.raftServerMetrics = RaftServerMetrics.getRaftServerMetrics(this);
}
private RetryCache initRetryCache(RaftProperties prop) {
@@ -545,49 +542,60 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
RaftClientRequest request) throws IOException {
assertLifeCycleState(RUNNING);
LOG.debug("{}: receive client request({})", getMemberId(), request);
- if (request.is(RaftClientRequestProto.TypeCase.STALEREAD)) {
- return staleReadAsync(request);
- }
-
- // first check the server's leader state
- CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null);
- if (reply != null) {
- return reply;
- }
+ Timer timer = raftServerMetrics.getClientRequestTimer(request);
+ final Timer.Context timerContext = (timer != null) ? timer.time() : null;
- // let the state machine handle read-only request from client
- final StateMachine stateMachine = getStateMachine();
- if (request.is(RaftClientRequestProto.TypeCase.READ)) {
- // TODO: We might not be the leader anymore by the time this completes.
- // See the RAFT paper section 8 (last part)
- return processQueryFuture(stateMachine.query(request.getMessage()),
request);
- }
-
- if (request.is(RaftClientRequestProto.TypeCase.WATCH)) {
- return watchAsync(request);
+ CompletableFuture<RaftClientReply> replyFuture;
+ if (request.is(RaftClientRequestProto.TypeCase.STALEREAD)) {
+ replyFuture = staleReadAsync(request);
+ } else {
+ // first check the server's leader state
+ CompletableFuture<RaftClientReply> reply = checkLeaderState(request,
null);
+ if (reply != null) {
+ return reply;
+ }
+ // let the state machine handle read-only request from client
+ final StateMachine stateMachine = getStateMachine();
+ if (request.is(RaftClientRequestProto.TypeCase.READ)) {
+ // TODO: We might not be the leader anymore by the time this completes.
+ // See the RAFT paper section 8 (last part)
+ replyFuture =
processQueryFuture(stateMachine.query(request.getMessage()), request);
+ } else if (request.is(RaftClientRequestProto.TypeCase.WATCH)) {
+ replyFuture = watchAsync(request);
+ } else {
+ // query the retry cache
+ RetryCache.CacheQueryResult previousResult = retryCache.queryCache(
+ request.getClientId(), request.getCallId());
+ if (previousResult.isRetry()) {
+ // if the previous attempt is still pending or it succeeded, return
its
+ // future
+ raftServerMetrics.onRetryRequestCacheHit();
+ replyFuture = previousResult.getEntry().getReplyFuture();
+ } else {
+ final RetryCache.CacheEntry cacheEntry = previousResult.getEntry();
+
+ // TODO: this client request will not be added to pending requests
until
+ // later which means that any failure in between will leave partial
state in
+ // the state machine. We should call cancelTransaction() for failed
requests
+ TransactionContext context = stateMachine.startTransaction(request);
+ if (context.getException() != null) {
+ RaftClientReply exceptionReply = new RaftClientReply(request,
+ new StateMachineException(getMemberId(),
context.getException()), getCommitInfos());
+ cacheEntry.failWithReply(exceptionReply);
+ replyFuture = CompletableFuture.completedFuture(exceptionReply);
+ } else {
+ replyFuture = appendTransaction(request, context, cacheEntry);
+ }
+ }
+ }
}
- // query the retry cache
- RetryCache.CacheQueryResult previousResult = retryCache.queryCache(
- request.getClientId(), request.getCallId());
- if (previousResult.isRetry()) {
- // if the previous attempt is still pending or it succeeded, return its
- // future
- return previousResult.getEntry().getReplyFuture();
- }
- final RetryCache.CacheEntry cacheEntry = previousResult.getEntry();
-
- // TODO: this client request will not be added to pending requests until
- // later which means that any failure in between will leave partial state
in
- // the state machine. We should call cancelTransaction() for failed
requests
- TransactionContext context = stateMachine.startTransaction(request);
- if (context.getException() != null) {
- RaftClientReply exceptionReply = new RaftClientReply(request,
- new StateMachineException(getMemberId(), context.getException()),
getCommitInfos());
- cacheEntry.failWithReply(exceptionReply);
- return CompletableFuture.completedFuture(exceptionReply);
- }
- return appendTransaction(request, context, cacheEntry);
+ replyFuture.whenComplete((clientReply, exception) -> {
+ if (clientReply.isSuccess() && timerContext != null) {
+ timerContext.stop();
+ }
+ });
+ return replyFuture;
}
private CompletableFuture<RaftClientReply> watchAsync(RaftClientRequest
request) {
@@ -910,7 +918,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
final long currentTerm;
final long followerCommit = state.getLog().getLastCommittedIndex();
final Optional<FollowerState> followerState;
- Timer.Context timer =
raftServerMetricsRegistry.timer(RatisMetricNames.FOLLOWER_APPEND_ENTRIES_LATENCY).time();
+ Timer.Context timer =
raftServerMetrics.getFollowerAppendEntryTimer().time();
synchronized (this) {
final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
currentTerm = state.getCurrentTerm();
@@ -1323,6 +1331,10 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
return leaderElectionMetricsRegistry;
}
+ public RaftServerMetrics getRaftServerMetrics() {
+ return raftServerMetrics;
+ }
+
private class RaftServerJmxAdapter extends JmxRegister implements
RaftServerMXBean {
@Override
public String getId() {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerMetrics.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerMetrics.java
index df43990..192c959 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerMetrics.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerMetrics.java
@@ -18,21 +18,33 @@
package org.apache.ratis.server.impl;
+import static
org.apache.ratis.server.metrics.RatisMetricNames.FOLLOWER_APPEND_ENTRIES_LATENCY;
import static
org.apache.ratis.server.metrics.RatisMetricNames.LEADER_METRIC_PEER_COMMIT_INDEX;
import static
org.apache.ratis.server.metrics.RatisMetricNames.LEADER_METRIC_FOLLOWER_LAST_HEARTBEAT_ELAPSED_TIME_METRIC;
+import static
org.apache.ratis.server.metrics.RatisMetricNames.RAFT_CLIENT_READ_REQUEST;
+import static
org.apache.ratis.server.metrics.RatisMetricNames.RAFT_CLIENT_STALE_READ_REQUEST;
+import static
org.apache.ratis.server.metrics.RatisMetricNames.RAFT_CLIENT_WATCH_REQUEST;
+import static
org.apache.ratis.server.metrics.RatisMetricNames.RAFT_CLIENT_WRITE_REQUEST;
+import static
org.apache.ratis.server.metrics.RatisMetricNames.REQUEST_QUEUE_LIMIT_HIT_COUNTER;
+import static
org.apache.ratis.server.metrics.RatisMetricNames.REQUEST_QUEUE_SIZE;
+import static
org.apache.ratis.server.metrics.RatisMetricNames.RETRY_REQUEST_CACHE_HIT_COUNTER;
import java.util.HashMap;
import java.util.Map;
import java.util.SortedMap;
import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Timer;
import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
+import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.metrics.RatisMetrics;
import
org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ResourceSemaphore;
/**
* Metric Registry for Raft Group Server. One instance per leader/follower.
@@ -49,13 +61,11 @@ public final class RaftServerMetrics {
RaftServerImpl raftServer) {
RaftServerMetrics serverMetrics = new RaftServerMetrics(raftServer);
metricsMap.put(raftServer.getMemberId().toString(), serverMetrics);
-
return serverMetrics;
}
private RaftServerMetrics(RaftServerImpl server) {
- registry = RatisMetrics.getMetricRegistryForRaftServer(
- server.getMemberId().toString());
+ registry =
RatisMetrics.getMetricRegistryForRaftServer(server.getMemberId().toString());
commitInfoCache = server.getCommitInfoCache();
addPeerCommitIndexGauge(server.getPeer());
}
@@ -130,4 +140,39 @@ public final class RaftServerMetrics {
followerLastHeartbeatElapsedTimeMap.put(peer.getId().toString(),
elapsedTime);
}
+
+ public Timer getFollowerAppendEntryTimer() {
+ return registry.timer(FOLLOWER_APPEND_ENTRIES_LATENCY);
+ }
+
+ public Timer getTimer(String timerName) {
+ return registry.timer(timerName);
+ }
+
+ public Timer getClientRequestTimer(RaftClientRequest request) {
+ if (request.is(TypeCase.READ)) {
+ return getTimer(RAFT_CLIENT_READ_REQUEST);
+ } else if (request.is(TypeCase.STALEREAD)) {
+ return getTimer(RAFT_CLIENT_STALE_READ_REQUEST);
+ } else if (request.is(TypeCase.WATCH)) {
+ String watchType =
RaftClientRequest.Type.toString(request.getType().getWatch().getReplication());
+ return getTimer(String.format(RAFT_CLIENT_WATCH_REQUEST, watchType));
+ } else if (request.is(TypeCase.WRITE)) {
+ return getTimer(RAFT_CLIENT_WRITE_REQUEST);
+ }
+ return null;
+ }
+
+ public void onRetryRequestCacheHit() {
+ registry.counter(RETRY_REQUEST_CACHE_HIT_COUNTER).inc();
+ }
+
+ public void onRequestQueueLimitHit() {
+ registry.counter(REQUEST_QUEUE_LIMIT_HIT_COUNTER).inc();
+ }
+
+ public void addNumPendingRequestsGauge(ResourceSemaphore resourceSemaphore,
int capacity) {
+ registry.gauge(REQUEST_QUEUE_SIZE,
+ () -> () -> (capacity - resourceSemaphore.availablePermits()));
+ }
}
\ No newline at end of file
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java
index c20b1c0..ffd6054 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java
@@ -39,6 +39,21 @@ public final class RatisMetricNames {
public static final String STATEMACHINE_APPLY_COMPLETED_GAUGE =
"statemachineApplyCompletedIndex";
+ // Raft client read request metric timer.
+ public static final String RAFT_CLIENT_READ_REQUEST = "clientReadRequest";
+
+ public static final String RAFT_CLIENT_STALE_READ_REQUEST =
"clientStaleReadRequest";
+
+ public static final String RAFT_CLIENT_WRITE_REQUEST = "clientWriteRequest";
+
+ public static final String RAFT_CLIENT_WATCH_REQUEST =
"clientWatch%sRequest";
+
+ public static final String RETRY_REQUEST_CACHE_HIT_COUNTER =
"numRetryCacheHits";
+
+ public static final String REQUEST_QUEUE_LIMIT_HIT_COUNTER =
"numRequestQueueLimitHits";
+
+ public static final String REQUEST_QUEUE_SIZE = "numPendingRequestInQueue";
+
//////////////////////////////
// Raft Log Write Path Metrics
/////////////////////////////
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
index 8714260..fd71fe3 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
@@ -75,9 +75,4 @@ public class RatisMetrics {
RATIS_LOG_WORKER_METRICS_DESC));
return ratisMetricRegistry.orElse(null);
}
-
- public static RatisMetricRegistry getMetricsRegistryForServer(String
serverId) {
- return create(new MetricRegistryInfo(serverId,
RATIS_APPLICATION_NAME_METRICS, RATIS_SERVER_METRICS,
- RATIS_SERVER_METRICS_DESC));
- }
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
index 05048b7..41ac36e 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
@@ -17,6 +17,11 @@
*/
package org.apache.ratis.grpc;
+import static
org.apache.ratis.server.metrics.RatisMetricNames.RAFT_CLIENT_READ_REQUEST;
+import static
org.apache.ratis.server.metrics.RatisMetricNames.RAFT_CLIENT_STALE_READ_REQUEST;
+import static
org.apache.ratis.server.metrics.RatisMetricNames.RAFT_CLIENT_WATCH_REQUEST;
+import static
org.apache.ratis.server.metrics.RatisMetricNames.RAFT_CLIENT_WRITE_REQUEST;
+
import org.apache.log4j.Level;
import org.apache.ratis.BaseTest;
import org.apache.ratis.MiniRaftCluster;
@@ -29,6 +34,7 @@ import org.apache.ratis.client.impl.RaftClientTestUtil;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.client.GrpcClientProtocolClient;
import org.apache.ratis.grpc.client.GrpcClientProtocolService;
+import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.protocol.AlreadyClosedException;
import org.apache.ratis.protocol.RaftClientReply;
@@ -37,6 +43,7 @@ import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.TimeoutIOException;
import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerMetrics;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.impl.ServerImplUtils;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
@@ -119,7 +126,7 @@ public class TestRaftServerWithGrpc extends BaseTest
implements MiniRaftClusterW
@Test
public void testLeaderRestart() throws Exception {
- runWithNewCluster(3, this::runTestLeaderRestart);
+ runWithNewCluster(1, this::runTestLeaderRestart);
}
void runTestLeaderRestart(MiniRaftClusterWithGrpc cluster) throws Exception {
@@ -167,6 +174,40 @@ public class TestRaftServerWithGrpc extends BaseTest
implements MiniRaftClusterW
}
+ @Test
+ public void testRaftClientMetrics() throws Exception {
+ runWithNewCluster(3, this::testRaftClientRequestMetrics);
+ }
+
+ void testRaftClientRequestMetrics(MiniRaftClusterWithGrpc cluster) throws
IOException,
+ ExecutionException, InterruptedException {
+ final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
+ RaftServerMetrics raftServerMetrics = leader.getRaftServerMetrics();
+
+ try (final RaftClient client = cluster.createClient()) {
+ final CompletableFuture<RaftClientReply> f1 = client.sendAsync(new
SimpleMessage("testing"));
+ Assert.assertTrue(f1.get().isSuccess());
+
Assert.assertTrue(raftServerMetrics.getTimer(RAFT_CLIENT_WRITE_REQUEST).getCount()
> 0);
+
+ final CompletableFuture<RaftClientReply> f2 =
client.sendReadOnlyAsync(new SimpleMessage("testing"));
+ Assert.assertTrue(f2.get().isSuccess());
+
Assert.assertTrue(raftServerMetrics.getTimer(RAFT_CLIENT_READ_REQUEST).getCount()
> 0);
+
+ final CompletableFuture<RaftClientReply> f3 =
client.sendStaleReadAsync(new SimpleMessage("testing"),
+ 0, leader.getId());
+ Assert.assertTrue(f3.get().isSuccess());
+
Assert.assertTrue(raftServerMetrics.getTimer(RAFT_CLIENT_STALE_READ_REQUEST).getCount()
> 0);
+
+ final CompletableFuture<RaftClientReply> f4 = client.sendWatchAsync(0,
RaftProtos.ReplicationLevel.ALL);
+ Assert.assertTrue(f4.get().isSuccess());
+
Assert.assertTrue(raftServerMetrics.getTimer(String.format(RAFT_CLIENT_WATCH_REQUEST,
"-ALL")).getCount() > 0);
+
+ final CompletableFuture<RaftClientReply> f5 = client.sendWatchAsync(0,
RaftProtos.ReplicationLevel.MAJORITY);
+ Assert.assertTrue(f5.get().isSuccess());
+
Assert.assertTrue(raftServerMetrics.getTimer(String.format(RAFT_CLIENT_WATCH_REQUEST,
"")).getCount() > 0);
+ }
+ }
+
static RaftClientRequest newRaftClientRequest(RaftClient client, RaftPeerId
serverId, long seqNum) {
final SimpleMessage m = new SimpleMessage("m" + seqNum);
return RaftClientTestUtil.newRaftClientRequest(client, serverId, seqNum, m,