This is an automated email from the ASF dual-hosted git repository.
elserj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new b98fac8 RATIS-549 RATIS-550 RATIS-552 RATIS-553 Metric Framework for
Ratis
b98fac8 is described below
commit b98fac8804193408b316f8af5983feb557da0cba
Author: Ankit Singhal <[email protected]>
AuthorDate: Mon May 13 19:03:41 2019 -0700
RATIS-549 RATIS-550 RATIS-552 RATIS-553 Metric Framework for Ratis
Closes #19
Signed-off-by: Josh Elser <[email protected]>
---
pom.xml | 15 ++-
ratis-assembly/pom.xml | 4 +
ratis-common/pom.xml | 5 -
.../java/org/apache/ratis/util/AtomicUtils.java | 60 +++++++++
.../org/apache/ratis/util/CollectionUtils.java | 21 +++
.../main/java/org/apache/ratis/util/DataQueue.java | 7 +-
.../ratis/logservice/client/LogServiceClient.java | 2 +-
.../metrics/LogServiceMetricsRegistry.java | 69 ++++++++++
.../ratis/logservice/server/LogStateMachine.java | 88 +++++++++---
.../ratis/logservice/server/MetaStateMachine.java | 58 +++++---
.../ratis/logservice/LogServiceReadWriteBase.java | 68 +++++++++-
.../ratis/logservice/server/TestMetaServer.java | 62 +++++++--
{ratis-server => ratis-metrics}/pom.xml | 44 +++---
.../org/apache/ratis/metrics/MetricRegistries.java | 86 ++++++++++++
.../ratis/metrics/MetricRegistriesLoader.java | 95 +++++++++++++
.../ratis/metrics/MetricRegistryFactory.java | 29 ++--
.../apache/ratis/metrics/MetricRegistryInfo.java | 87 ++++++++++++
.../ratis/metrics/impl/MetricRegistriesImpl.java | 76 +++++++++++
.../metrics/impl/MetricRegistryFactoryImpl.java | 25 ++--
.../ratis/metrics/impl/RatisMetricRegistry.java | 103 +++++++++++++++
.../apache/ratis/metrics/impl/RefCountingMap.java | 86 ++++++++++++
...rg.apache.hadoop.hbase.metrics.MetricRegistries | 18 +++
.../ratis/metrics/TestMetricRegistriesLoader.java | 54 ++++++++
.../ratis/metrics/impl/TestRefCountingMap.java | 147 +++++++++++++++++++++
ratis-server/pom.xml | 10 ++
.../apache/ratis/server/metrics/RatisMetrics.java | 51 +++++++
.../raftlog/segmented/SegmentedRaftLogWorker.java | 30 +++--
.../ratis/statemachine/impl/BaseStateMachine.java | 16 +++
.../ratis/server/storage/RaftStorageTestUtils.java | 7 +
.../ratis/server/raftlog/TestRaftLogMetrics.java | 35 ++---
.../apache/ratis/util/TestDataBlockingQueue.java | 3 +-
31 files changed, 1314 insertions(+), 147 deletions(-)
diff --git a/pom.xml b/pom.xml
index ca7fcb4..9ec29e6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -56,6 +56,7 @@
<module>ratis-examples</module>
<module>ratis-replicated-map</module>
<module>ratis-logservice</module>
+ <module>ratis-metrics</module>
</modules>
<pluginRepositories>
@@ -235,7 +236,18 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
-
+ <dependency>
+ <artifactId>ratis-metrics</artifactId>
+ <groupId>org.apache.ratis</groupId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <artifactId>ratis-metrics</artifactId>
+ <groupId>org.apache.ratis</groupId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
<dependency>
<artifactId>ratis-grpc</artifactId>
<groupId>org.apache.ratis</groupId>
@@ -248,7 +260,6 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
-
<dependency>
<artifactId>ratis-hadoop</artifactId>
<groupId>org.apache.ratis</groupId>
diff --git a/ratis-assembly/pom.xml b/ratis-assembly/pom.xml
index 0c243de..990e6fd 100644
--- a/ratis-assembly/pom.xml
+++ b/ratis-assembly/pom.xml
@@ -241,6 +241,10 @@
<groupId>org.apache.ratis</groupId>
<type>test-jar</type>
</dependency>
+ <dependency>
+ <groupId>org.apache.ratis</groupId>
+ <artifactId>ratis-metrics</artifactId>
+ </dependency>
</dependencies>
<profiles>
diff --git a/ratis-common/pom.xml b/ratis-common/pom.xml
index 11d813d..e97d05a 100644
--- a/ratis-common/pom.xml
+++ b/ratis-common/pom.xml
@@ -53,11 +53,6 @@
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>io.dropwizard.metrics</groupId>
- <artifactId>metrics-core</artifactId>
- </dependency>
-
</dependencies>
<profiles>
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/AtomicUtils.java
b/ratis-common/src/main/java/org/apache/ratis/util/AtomicUtils.java
new file mode 100644
index 0000000..264366f
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/AtomicUtils.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Utilities related to atomic operations.
+ */
+public class AtomicUtils {
+ /**
+ * Updates a AtomicLong which is supposed to maintain the minimum values.
This method is not
+ * synchronized but is thread-safe.
+ */
+ public static void updateMin(AtomicLong min, long value) {
+ while (true) {
+ long cur = min.get();
+ if (value >= cur) {
+ break;
+ }
+
+ if (min.compareAndSet(cur, value)) {
+ break;
+ }
+ }
+ }
+
+ /**
+ * Updates a AtomicLong which is supposed to maintain the maximum values.
This method is not
+ * synchronized but is thread-safe.
+ */
+ public static void updateMax(AtomicLong max, long value) {
+ while (true) {
+ long cur = max.get();
+ if (value <= cur) {
+ break;
+ }
+
+ if (max.compareAndSet(cur, value)) {
+ break;
+ }
+ }
+ }
+
+}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
index 39d9908..f7100c4 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
@@ -18,6 +18,7 @@
package org.apache.ratis.util;
import java.util.*;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -106,4 +107,24 @@ public interface CollectionUtils {
Preconditions.assertTrue(removed,
() -> "Entry not found for key " + key + " in map " + name.get());
}
+
+ public static <K, V> V computeIfAbsent(ConcurrentMap<K, V> map, K key,
Supplier<V> supplier,
+ Runnable actionIfAbsent) {
+ V v = map.get(key);
+ if (v != null) {
+ return v;
+ }
+ V newValue = supplier.get();
+ v = map.putIfAbsent(key, newValue);
+ if (v != null) {
+ return v;
+ }
+ actionIfAbsent.run();
+ return newValue;
+ }
+
+ public static <K, V> V computeIfAbsent(ConcurrentMap<K, V> map, K key,
Supplier<V> supplier) {
+ return computeIfAbsent(map, key, supplier, () -> {
+ });
+ }
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
b/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
index 8d4ec99..c0cc07e 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
@@ -53,7 +53,8 @@ public class DataQueue<E> {
private int numBytes = 0;
- public DataQueue(Object name, SizeInBytes byteLimit, int elementLimit,
ToIntFunction<E> getNumBytes) {
+ public DataQueue(Object name, SizeInBytes byteLimit, int elementLimit,
+ ToIntFunction<E> getNumBytes) {
this.name = name != null? name: this;
this.byteLimit = byteLimit.getSizeInt();
this.elementLimit = elementLimit;
@@ -149,4 +150,8 @@ public class DataQueue<E> {
}
return polled;
}
+
+ public int size(){
+ return q.size();
+ }
}
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/client/LogServiceClient.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/client/LogServiceClient.java
index 1e42ab4..7db7833 100644
---
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/client/LogServiceClient.java
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/client/LogServiceClient.java
@@ -166,7 +166,7 @@ public class LogServiceClient implements AutoCloseable {
}
/**
- * Moves the {@link LogStream} identified by the {@code name} from {@link
State.OPEN} to {@link State.CLOSED}.
+ * Moves the {@link LogStream} identified by the {@code name} from {@link
State#OPEN} to {@link State#CLOSED}.
* If the log is not {@link State#OPEN}, this method returns an error.
*
* @param name The name of the log to close
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/metrics/LogServiceMetricsRegistry.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/metrics/LogServiceMetricsRegistry.java
new file mode 100644
index 0000000..8e6acfb
--- /dev/null
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/metrics/LogServiceMetricsRegistry.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.logservice.metrics;
+
+import java.util.Optional;
+
+import com.codahale.metrics.JmxReporter;
+import org.apache.ratis.metrics.MetricRegistries;
+import org.apache.ratis.metrics.MetricRegistryInfo;
+import org.apache.ratis.metrics.impl.RatisMetricRegistry;
+
+public class LogServiceMetricsRegistry {
+ public static final String RATIS_LOG_SERVICE_METRICS_CONTEXT =
"RatisLogService";
+ public static final String RATIS_LOG_SERVICE_METRICS_DESC = "Ratis log
service metrics";
+ public static final String RATIS_LOG_SERVICE_META_DATA_METRICS_CONTEXT =
+ "RatisLogServiceMetaData";
+ public static final String RATIS_LOG_SERVICE_META_DATA_METRICS_DESC =
+ "Ratis log service metadata metrics";
+ public static final String JMX_DOMAIN = "RatisLogService";
+
+ public static RatisMetricRegistry createMetricRegistryForLogService(String
logName) {
+ return create(new MetricRegistryInfo(logName,
RATIS_LOG_SERVICE_METRICS_DESC,
+ RATIS_LOG_SERVICE_METRICS_CONTEXT));
+ }
+
+ public static RatisMetricRegistry getMetricRegistryForLogService(String
logName) {
+ return MetricRegistries.global().get(
+ new MetricRegistryInfo(logName, RATIS_LOG_SERVICE_METRICS_DESC,
+ RATIS_LOG_SERVICE_METRICS_CONTEXT)).get();
+ }
+
+ public static RatisMetricRegistry
createMetricRegistryForLogServiceMetaData(String className) {
+ return create(new MetricRegistryInfo(className,
RATIS_LOG_SERVICE_META_DATA_METRICS_DESC,
+ RATIS_LOG_SERVICE_META_DATA_METRICS_CONTEXT));
+ }
+
+ public static RatisMetricRegistry
getMetricRegistryForLogServiceMetaData(String className) {
+ return MetricRegistries.global().get(
+ new MetricRegistryInfo(className,
RATIS_LOG_SERVICE_META_DATA_METRICS_DESC,
+ RATIS_LOG_SERVICE_META_DATA_METRICS_CONTEXT)).get();
+ }
+
+ private static RatisMetricRegistry create(MetricRegistryInfo info) {
+ Optional<RatisMetricRegistry> metricRegistry =
MetricRegistries.global().get(info);
+ if (metricRegistry.isPresent()) {
+ return metricRegistry.get();
+ }
+ RatisMetricRegistry registry = MetricRegistries.global().create(info);
+
JmxReporter.forRegistry(registry.getDropWizardMetricRegistry()).inDomain(JMX_DOMAIN).build().start();
+ return registry;
+ }
+
+}
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
index aafb16a..7405daa 100644
---
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
@@ -30,7 +30,9 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.ratis.logservice.api.LogName;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+import org.apache.ratis.logservice.metrics.LogServiceMetricsRegistry;
import
org.apache.ratis.logservice.proto.LogServiceProtos.AppendLogEntryRequestProto;
import org.apache.ratis.logservice.proto.LogServiceProtos.CloseLogReplyProto;
import org.apache.ratis.logservice.proto.LogServiceProtos.CloseLogRequestProto;
@@ -40,6 +42,7 @@ import
org.apache.ratis.logservice.proto.LogServiceProtos.GetStateRequestProto;
import
org.apache.ratis.logservice.proto.LogServiceProtos.LogServiceRequestProto;
import org.apache.ratis.logservice.proto.LogServiceProtos.ReadLogRequestProto;
import org.apache.ratis.logservice.util.LogServiceProtoUtil;
+import org.apache.ratis.metrics.impl.RatisMetricRegistry;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.protocol.Message;
@@ -63,6 +66,16 @@ import org.slf4j.LoggerFactory;
public class LogStateMachine extends BaseStateMachine {
public static final Logger LOG =
LoggerFactory.getLogger(LogStateMachine.class);
+ private RatisMetricRegistry metricRegistry;
+ private Timer sizeRequestTimer;
+ private Timer readNextQueryTimer;
+ private Timer getStateTimer;
+ private Timer lastIndexQueryTimer;
+ private Timer lengthQueryTimer;
+ private Timer startIndexTimer;
+ private Timer appendRequestTimer;
+ private Timer syncRequesTimer;
+ private Timer getCloseLogTimer;
public static enum State {
OPEN, CLOSED
@@ -114,6 +127,18 @@ public class LogStateMachine extends BaseStateMachine {
this.storage.init(raftStorage);
this.proxy = (RaftServerProxy) server;
this.groupId = groupId;
+ //TODO: using groupId for metric now but better to tag it with LogName
+ this.metricRegistry =
+
LogServiceMetricsRegistry.createMetricRegistryForLogService(groupId.toString());
+ this.readNextQueryTimer = metricRegistry.timer("readNextQueryTime");
+ this.startIndexTimer= metricRegistry.timer("startIndexTime");
+ this.sizeRequestTimer = metricRegistry.timer("sizeRequestTime");
+ this.getStateTimer = metricRegistry.timer("getStateTime");
+ this.lastIndexQueryTimer = metricRegistry.timer("lastIndexQueryTime");
+ this.lengthQueryTimer = metricRegistry.timer("lengthQueryTime");
+ this.syncRequesTimer = metricRegistry.timer("syncRequesTime");
+ this.appendRequestTimer = metricRegistry.timer("appendRequestTime");
+ this.getCloseLogTimer = metricRegistry.timer("getCloseLogTime");
loadSnapshot(storage.getLatestSnapshot());
}
@@ -206,17 +231,41 @@ public class LogStateMachine extends BaseStateMachine {
switch (logServiceRequestProto.getRequestCase()) {
case READNEXTQUERY:
- return processReadRequest(logServiceRequestProto);
+ return recordTime(readNextQueryTimer, new Task(){
+ @Override public CompletableFuture<Message> run() {
+ return processReadRequest(logServiceRequestProto);
+ }
+ });
case SIZEREQUEST:
- return processGetSizeRequest(logServiceRequestProto);
+ return recordTime(sizeRequestTimer, new Task(){
+ @Override public CompletableFuture<Message> run() {
+ return processGetSizeRequest(logServiceRequestProto);
+ }
+ });
case STARTINDEXQUERY:
- return processGetStartIndexRequest(logServiceRequestProto);
+ return recordTime(startIndexTimer, new Task(){
+ @Override public CompletableFuture<Message> run() {
+ return processGetStartIndexRequest(logServiceRequestProto);
+ }
+ });
case GETSTATE:
- return processGetStateRequest(logServiceRequestProto);
+ return recordTime(getStateTimer, new Task(){
+ @Override public CompletableFuture<Message> run() {
+ return processGetStateRequest(logServiceRequestProto);
+ }
+ });
case LASTINDEXQUERY:
- return processGetLastCommittedIndexRequest(logServiceRequestProto);
+ return recordTime(lastIndexQueryTimer, new Task(){
+ @Override public CompletableFuture<Message> run() {
+ return
processGetLastCommittedIndexRequest(logServiceRequestProto);
+ }
+ });
case LENGTHQUERY:
- return processGetLengthRequest(logServiceRequestProto);
+ return recordTime(lengthQueryTimer, new Task(){
+ @Override public CompletableFuture<Message> run() {
+ return processGetLengthRequest(logServiceRequestProto);
+ }
+ });
default:
// TODO
throw new RuntimeException(
@@ -232,7 +281,7 @@ public class LogStateMachine extends BaseStateMachine {
/**
* Process get start index request
- * @param msg message
+ * @param proto message
* @return reply message
*/
private CompletableFuture<Message>
@@ -247,7 +296,7 @@ public class LogStateMachine extends BaseStateMachine {
/**
* Process get last committed record index
- * @param msg message
+ * @param proto message
* @return reply message
*/
private CompletableFuture<Message>
@@ -262,7 +311,7 @@ public class LogStateMachine extends BaseStateMachine {
/**
* Process get length request
- * @param msg message
+ * @param proto message
* @return reply message
*/
private CompletableFuture<Message>
processGetSizeRequest(LogServiceRequestProto proto) {
@@ -282,7 +331,7 @@ public class LogStateMachine extends BaseStateMachine {
}
/**
* Process read log entries request
- * @param msg message
+ * @param proto message
* @return reply message
*/
private CompletableFuture<Message> processReadRequest(LogServiceRequestProto
proto) {
@@ -377,11 +426,20 @@ public class LogStateMachine extends BaseStateMachine {
LogServiceRequestProto.parseFrom(entry.getStateMachineLogEntry().getLogData());
switch (logServiceRequestProto.getRequestCase()) {
case CLOSELOG:
- return processCloseLog(logServiceRequestProto);
+ return recordTime(getCloseLogTimer, new Task(){
+ @Override public CompletableFuture<Message> run() {
+ return processCloseLog(logServiceRequestProto);
+ }});
case APPENDREQUEST:
- return processAppendRequest(trx, logServiceRequestProto);
+ return recordTime(appendRequestTimer, new Task(){
+ @Override public CompletableFuture<Message> run() {
+ return processAppendRequest(trx, logServiceRequestProto);
+ }});
case SYNCREQUEST:
- return processSyncRequest(trx, logServiceRequestProto);
+ return recordTime(syncRequesTimer, new Task(){
+ @Override public CompletableFuture<Message> run() {
+ return processSyncRequest(trx, logServiceRequestProto);
+ }});
default:
//TODO
return null;
@@ -396,7 +454,6 @@ public class LogStateMachine extends BaseStateMachine {
private CompletableFuture<Message> processCloseLog(LogServiceRequestProto
logServiceRequestProto) {
CloseLogRequestProto closeLog = logServiceRequestProto.getCloseLog();
- LogName logName = LogServiceProtoUtil.toLogName(closeLog.getLogName());
// Need to check whether the file is opened if opened close it.
// TODO need to handle exceptions while operating with files.
return CompletableFuture.completedFuture(Message
@@ -408,7 +465,6 @@ public class LogStateMachine extends BaseStateMachine {
private CompletableFuture<Message> processGetStateRequest(
LogServiceRequestProto logServiceRequestProto) {
GetStateRequestProto getState = logServiceRequestProto.getGetState();
- LogName logName = LogServiceProtoUtil.toLogName(getState.getLogName());
return
CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil
.toGetStateReplyProto(state == State.OPEN).toByteString()));
}
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
index 24f5263..3a255ad 100644
---
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
@@ -32,6 +32,8 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.logservice.api.LogInfo;
@@ -39,6 +41,7 @@ import org.apache.ratis.logservice.api.LogName;
import org.apache.ratis.logservice.common.LogAlreadyExistException;
import org.apache.ratis.logservice.common.LogNotFoundException;
import org.apache.ratis.logservice.common.NoEnoughWorkersException;
+import org.apache.ratis.logservice.metrics.LogServiceMetricsRegistry;
import org.apache.ratis.logservice.proto.MetaServiceProtos;
import
org.apache.ratis.logservice.proto.MetaServiceProtos.ArchiveLogReplyProto;
import
org.apache.ratis.logservice.proto.MetaServiceProtos.ArchiveLogRequestProto;
@@ -50,6 +53,7 @@ import
org.apache.ratis.logservice.proto.MetaServiceProtos.LogServiceUnregisterL
import org.apache.ratis.logservice.proto.MetaServiceProtos.MetaSMRequestProto;
import org.apache.ratis.logservice.util.LogServiceProtoUtil;
import org.apache.ratis.logservice.util.MetaServiceProtoUtil;
+import org.apache.ratis.metrics.impl.RatisMetricRegistry;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.Message;
@@ -100,6 +104,7 @@ public class MetaStateMachine extends BaseStateMachine {
private RaftGroupId metadataGroupId;
private RaftGroupId logServerGroupId;
+ private RatisMetricRegistry metricRegistry;
public MetaStateMachine(RaftGroupId metadataGroupId, RaftGroupId
logServerGroupId) {
this.metadataGroupId = metadataGroupId;
@@ -109,6 +114,8 @@ public class MetaStateMachine extends BaseStateMachine {
@Override
public void initialize(RaftServer server, RaftGroupId groupId, RaftStorage
storage) throws IOException {
this.raftServer = server;
+ this.metricRegistry = LogServiceMetricsRegistry
+
.createMetricRegistryForLogServiceMetaData(getClass().getSimpleName());
super.initialize(server, groupId, storage);
}
@@ -172,27 +179,31 @@ public class MetaStateMachine extends BaseStateMachine {
@Override
public CompletableFuture<Message> query(Message request) {
- if (currentGroup == null) {
- try {
- List<RaftGroup> x =
StreamSupport.stream(raftServer.getGroups().spliterator(), false)
- .filter(group ->
group.getGroupId().equals(metadataGroupId))
- .collect(Collectors.toList());
- if (x.size() == 1) {
- currentGroup = x.get(0);
+ Timer.Context timerContext = null;
+ MetaServiceProtos.MetaServiceRequestProto.TypeCase type = null;
+ try {
+ if (currentGroup == null) {
+ try {
+ List<RaftGroup> x =
+
StreamSupport.stream(raftServer.getGroups().spliterator(), false)
+ .filter(group ->
group.getGroupId().equals(metadataGroupId)).collect(Collectors.toList());
+ if (x.size() == 1) {
+ currentGroup = x.get(0);
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
}
- } catch (IOException e) {
+ }
+ RaftProperties properties = new RaftProperties();
+ MetaServiceProtos.MetaServiceRequestProto req = null;
+ try {
+ req =
MetaServiceProtos.MetaServiceRequestProto.parseFrom(request.getContent());
+ } catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
- }
- RaftProperties properties = new RaftProperties();
- MetaServiceProtos.MetaServiceRequestProto req = null;
- try {
- req =
MetaServiceProtos.MetaServiceRequestProto.parseFrom(request.getContent());
- } catch (InvalidProtocolBufferException e) {
- e.printStackTrace();
- }
- MetaServiceProtos.MetaServiceRequestProto.TypeCase type =
req.getTypeCase();
- switch (type) {
+ type = req.getTypeCase();
+ timerContext = metricRegistry.timer(type.name()).time();
+ switch (type) {
case CREATELOG:
return processCreateLogRequest(req);
@@ -204,10 +215,15 @@ public class MetaStateMachine extends BaseStateMachine {
return processArchiveLog(req);
case DELETELOG:
return processDeleteLog(req);
- default:
+ default:
+ }
+ CompletableFuture<Message> reply = super.query(request);
+ return reply;
+ }finally{
+ if (timerContext != null) {
+ timerContext.stop();
+ }
}
- CompletableFuture<Message> reply = super.query(request);
- return reply;
}
diff --git
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
index e378068..166702e 100644
---
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
+++
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -29,6 +30,8 @@ import java.util.stream.LongStream;
import java.util.Iterator;
import java.util.List;
+import javax.management.ObjectName;
+
import org.apache.ratis.BaseTest;
import org.apache.ratis.MiniRaftCluster;
import org.apache.ratis.RaftTestUtil;
@@ -40,6 +43,7 @@ import org.apache.ratis.logservice.api.LogStream;
import org.apache.ratis.logservice.api.LogStream.State;
import org.apache.ratis.logservice.api.LogWriter;
import org.apache.ratis.logservice.impl.LogStreamImpl;
+import org.apache.ratis.logservice.metrics.LogServiceMetricsRegistry;
import org.apache.ratis.logservice.server.LogStateMachine;
import org.apache.ratis.logservice.util.TestUtils;
import org.apache.ratis.statemachine.StateMachine;
@@ -64,6 +68,46 @@ public abstract class LogServiceReadWriteBase<CLUSTER
extends MiniRaftCluster>
static final int NUM_PEERS = 3;
CLUSTER cluster;
+ class MetricLogStream extends LogStreamImpl{
+ Long startRecordIdCount = 0l;
+ Long getStateCount = 0l;
+ Long getLastRecordIdCount = 0l;
+ Long getLengthCount = 0l;
+ Long getSizeCount = 0l;
+
+ public MetricLogStream(LogName name, RaftClient raftClient) {
+ super(name, raftClient);
+ }
+
+ @Override public long getStartRecordId() throws IOException {
+ startRecordIdCount++;
+ return super.getStartRecordId();
+ }
+
+ @Override public State getState() {
+ getStateCount++;
+ return super.getState();
+ }
+
+ @Override public long getLastRecordId() throws IOException {
+ getLastRecordIdCount++;
+ return super.getLastRecordId();
+ }
+
+ @Override public long getLength() throws IOException {
+ getLengthCount++;
+ return super.getLength();
+ }
+
+ @Override public long getSize() throws IOException {
+ getSizeCount++;
+ return super.getSize();
+ }
+
+ @Override public LogName getName() {
+ return super.getName();
+ }
+ }
@Before
public void setUpCluster() throws IOException, InterruptedException {
cluster = newCluster(NUM_PEERS);
@@ -78,11 +122,12 @@ public abstract class LogServiceReadWriteBase<CLUSTER
extends MiniRaftCluster>
.build();
LogName logName = LogName.of("log1");
// TODO need API to circumvent metadata service for testing
- try (LogStream logStream = new LogStreamImpl(logName, raftClient)) {
+ try (LogStream logStream = new MetricLogStream(logName, raftClient)) {
assertEquals("log1", logStream.getName().getName());
assertEquals(State.OPEN, logStream.getState());
assertEquals(0, logStream.getSize());
assertEquals(0, logStream.getLength());
+ testJMXMetrics(logStream);
LogReader reader = logStream.createReader();
LogWriter writer = logStream.createWriter();
@@ -120,6 +165,7 @@ public abstract class LogServiceReadWriteBase<CLUSTER
extends MiniRaftCluster>
ByteBuffer actual = actualIter.next();
assertEquals(expected, actual);
}
+ testJMXMetrics(logStream);
}
}
@@ -131,7 +177,7 @@ public abstract class LogServiceReadWriteBase<CLUSTER
extends MiniRaftCluster>
final LogName logName = LogName.of("log1");
final int numRecords = 25;
// TODO need API to circumvent metadata service for testing
- try (LogStream logStream = new LogStreamImpl(logName, raftClient)) {
+ try (LogStream logStream = new MetricLogStream(logName, raftClient)) {
try (LogWriter writer = logStream.createWriter()) {
LOG.info("Writing {} records", numRecords);
// Write records 0 through 99 (inclusive)
@@ -172,7 +218,7 @@ public abstract class LogServiceReadWriteBase<CLUSTER
extends MiniRaftCluster>
final LogName logName = LogName.of("log1");
final int numRecords = 100;
// TODO need API to circumvent metadata service for testing
- try (LogStream logStream = new LogStreamImpl(logName, raftClient)) {
+ try (LogStream logStream = new MetricLogStream(logName, raftClient)) {
try (LogWriter writer = logStream.createWriter()) {
LOG.info("Writing {} records", numRecords);
// Write records 0 through 99 (inclusive)
@@ -208,7 +254,7 @@ public abstract class LogServiceReadWriteBase<CLUSTER
extends MiniRaftCluster>
.build();
final LogName logName = LogName.of("log1");
final int numRecords = 10;
- try (LogStream logStream = new LogStreamImpl(logName, raftClient)) {
+ try (LogStream logStream = new MetricLogStream(logName, raftClient)) {
final List<Long> recordIds;
try (LogWriter writer = logStream.createWriter()) {
LOG.info("Writing {} records", numRecords);
@@ -258,4 +304,18 @@ public abstract class LogServiceReadWriteBase<CLUSTER
extends MiniRaftCluster>
System.arraycopy(bb.array(), bb.arrayOffset(), bytes, 0, bb.remaining());
return Integer.parseInt(new String(bytes, StandardCharsets.UTF_8));
}
+
+ private void testJMXMetrics(LogStream logStream) throws Exception {
+ assertEquals(((MetricLogStream) logStream).getLengthCount,
+ getJMXCount(cluster.getGroup().getGroupId().toString(),
"lengthQueryTime"));
+ assertEquals(((MetricLogStream) logStream).getSizeCount,
+ getJMXCount(cluster.getGroup().getGroupId().toString(),
"sizeRequestTime"));
+
+ }
+
+ private Long getJMXCount(String groupId, String metricName) throws Exception
{
+ ObjectName oname = new ObjectName(LogServiceMetricsRegistry.JMX_DOMAIN,
"name",
+ groupId + "." +
LogServiceMetricsRegistry.RATIS_LOG_SERVICE_METRICS_CONTEXT + "." + metricName);
+ return (Long)
ManagementFactory.getPlatformMBeanServer().getAttribute(oname, "Count");
+ }
}
diff --git
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
index 47aa60a..7dab2f5 100644
---
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
+++
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
@@ -22,7 +22,8 @@ import org.apache.ratis.logservice.api.*;
import org.apache.ratis.logservice.client.LogServiceClient;
import org.apache.ratis.logservice.common.LogAlreadyExistException;
import org.apache.ratis.logservice.common.LogNotFoundException;
-import org.apache.ratis.logservice.server.LogServer;
+import org.apache.ratis.logservice.metrics.LogServiceMetricsRegistry;
+import org.apache.ratis.logservice.proto.MetaServiceProtos;
import org.apache.ratis.logservice.util.LogServiceCluster;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerProxy;
@@ -32,8 +33,10 @@ import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
+import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import static org.junit.Assert.assertEquals;
@@ -41,11 +44,30 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
+import javax.management.ObjectName;
public class TestMetaServer {
static LogServiceCluster cluster = null;
+ static AtomicInteger createCount = new AtomicInteger();
+ static AtomicInteger deleteCount = new AtomicInteger();
+ static AtomicInteger listCount = new AtomicInteger();
+ LogServiceClient client = new LogServiceClient(cluster.getMetaIdentity()){
+ @Override public LogStream createLog(LogName logName) throws
IOException {
+ createCount.incrementAndGet();
+ return super.createLog(logName);
+ }
+
+ @Override public void deleteLog(LogName logName) throws IOException {
+ deleteCount.incrementAndGet();
+ super.deleteLog(logName);
+ }
+ @Override public List<LogInfo> listLogs() throws IOException {
+ listCount.incrementAndGet();
+ return super.listLogs();
+ }
+ };
@BeforeClass
public static void beforeClass() {
cluster = new LogServiceCluster(3);
@@ -66,19 +88,19 @@ public class TestMetaServer {
* @throws IOException
*/
@Test
- public void testCreateAndGetLog() throws IOException {
- LogServiceClient client = new
LogServiceClient(cluster.getMetaIdentity());
+ public void testCreateAndGetLog() throws Exception {
+
// This should be LogServiceStream ?
LogStream logStream1 = client.createLog(LogName.of("testCreateLog"));
assertNotNull(logStream1);
LogStream logStream2 = client.getLog(LogName.of("testCreateLog"));
+
testJMXCount(MetaServiceProtos.MetaServiceRequestProto.TypeCase.GETLOG.name(),1l);
assertNotNull(logStream2);
}
@Test
public void testReadWritetoLog() throws IOException, InterruptedException {
- LogServiceClient client = new
LogServiceClient(cluster.getMetaIdentity());
LogStream stream = client.createLog(LogName.of("testReadWrite"));
LogWriter writer = stream.createWriter();
ByteBuffer testMessage = ByteBuffer.wrap("Hello world!".getBytes());
@@ -107,12 +129,13 @@ public class TestMetaServer {
*/
@Test
- public void testDeleteLog() throws IOException {
- LogServiceClient client = new
LogServiceClient(cluster.getMetaIdentity());
+ public void testDeleteLog() throws Exception {
// This should be LogServiceStream ?
LogStream logStream1 = client.createLog(LogName.of("testDeleteLog"));
assertNotNull(logStream1);
client.deleteLog(LogName.of("testDeleteLog"));
+
testJMXCount(MetaServiceProtos.MetaServiceRequestProto.TypeCase.DELETELOG.name(),
+ (long) deleteCount.get());
try {
logStream1 = client.getLog(LogName.of("testDeleteLog"));
fail("Failed to throw LogNotFoundException");
@@ -128,7 +151,6 @@ public class TestMetaServer {
*/
@Test
public void testGetNotExistingLog() {
- LogServiceClient client = new
LogServiceClient(cluster.getMetaIdentity());
try {
LogStream log = client.getLog(LogName.of("no_such_log"));
fail("LogNotFoundException was not thrown");
@@ -142,8 +164,7 @@ public class TestMetaServer {
* @throws IOException
*/
@Test
- public void testAlreadyExistLog() throws IOException {
- LogServiceClient client = new
LogServiceClient(cluster.getMetaIdentity());
+ public void testAlreadyExistLog() throws Exception {
LogStream logStream1 = client.createLog(LogName.of("test1"));
assertNotNull(logStream1);
try {
@@ -159,8 +180,7 @@ public class TestMetaServer {
* @throws IOException
*/
@Test
- public void testListLogs() throws IOException {
- LogServiceClient client = new
LogServiceClient(cluster.getMetaIdentity());
+ public void testListLogs() throws Exception {
client.createLog(LogName.of("listLogTest1"));
client.createLog(LogName.of("listLogTest2"));
client.createLog(LogName.of("listLogTest3"));
@@ -168,15 +188,31 @@ public class TestMetaServer {
client.createLog(LogName.of("listLogTest5"));
client.createLog(LogName.of("listLogTest6"));
client.createLog(LogName.of("listLogTest7"));
+ // Test jmx
+
List<LogInfo> list = client.listLogs();
+
testJMXCount(MetaServiceProtos.MetaServiceRequestProto.TypeCase.CREATELOG.name(),
+ (long) createCount.get() );
+
testJMXCount(MetaServiceProtos.MetaServiceRequestProto.TypeCase.LISTLOGS.name(),listCount.longValue());
assert(list.stream().filter(log ->
log.getLogName().getName().startsWith("listLogTest")).count() == 7);
}
+ private void testJMXCount(String metricName, Long expectedCount) throws
Exception {
+ assertEquals(expectedCount, getJMXCount(metricName));
+ }
+
+ private Long getJMXCount(String metricName) throws Exception{
+ ObjectName oname = new
ObjectName(LogServiceMetricsRegistry.JMX_DOMAIN, "name",
+ MetaStateMachine.class.getSimpleName() + "."
+ +
LogServiceMetricsRegistry.RATIS_LOG_SERVICE_META_DATA_METRICS_CONTEXT + "."
+ + metricName);
+ return (Long)
ManagementFactory.getPlatformMBeanServer().getAttribute(oname, "Count");
+ }
+
@Ignore ("Too heavy for the current implementation")
@Test
- public void testFinalClieanUp() throws IOException {
- LogServiceClient client = new
LogServiceClient(cluster.getMetaIdentity());
+ public void testFinalClieanUp() throws Exception {
IntStream.range(0, 10).forEach(i -> {
try {
client.createLog(LogName.of("CleanTest" + i));
diff --git a/ratis-server/pom.xml b/ratis-metrics/pom.xml
similarity index 78%
copy from ratis-server/pom.xml
copy to ratis-metrics/pom.xml
index 4ccb101..fde261b 100644
--- a/ratis-server/pom.xml
+++ b/ratis-metrics/pom.xml
@@ -20,55 +20,41 @@
<version>0.4.0-SNAPSHOT</version>
</parent>
- <artifactId>ratis-server</artifactId>
- <name>Apache Ratis Server</name>
+ <artifactId>ratis-metrics</artifactId>
+ <name>Apache Ratis Metrics</name>
<dependencies>
<dependency>
- <groupId>org.apache.ratis</groupId>
- <artifactId>ratis-thirdparty-misc</artifactId>
- </dependency>
- <dependency>
<artifactId>ratis-proto</artifactId>
<groupId>org.apache.ratis</groupId>
</dependency>
-
<dependency>
- <artifactId>ratis-common</artifactId>
- <groupId>org.apache.ratis</groupId>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
- <artifactId>ratis-common</artifactId>
- <groupId>org.apache.ratis</groupId>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
<scope>test</scope>
- <type>test-jar</type>
- </dependency>
-
- <dependency>
- <artifactId>ratis-client</artifactId>
- <groupId>org.apache.ratis</groupId>
</dependency>
<dependency>
- <artifactId>ratis-client</artifactId>
- <groupId>org.apache.ratis</groupId>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
<scope>test</scope>
- <type>test-jar</type>
</dependency>
-
<dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
+ <artifactId>ratis-common</artifactId>
+ <groupId>org.apache.ratis</groupId>
</dependency>
-
<dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
</dependency>
<dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
+ <artifactId>ratis-common</artifactId>
+ <groupId>org.apache.ratis</groupId>
<scope>test</scope>
+ <type>test-jar</type>
</dependency>
</dependencies>
</project>
diff --git
a/ratis-metrics/src/main/java/org/apache/ratis/metrics/MetricRegistries.java
b/ratis-metrics/src/main/java/org/apache/ratis/metrics/MetricRegistries.java
new file mode 100644
index 0000000..97c81cf
--- /dev/null
+++ b/ratis-metrics/src/main/java/org/apache/ratis/metrics/MetricRegistries.java
@@ -0,0 +1,86 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.metrics;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Set;
+
+import com.codahale.metrics.MetricRegistry;
+import org.apache.ratis.metrics.impl.RatisMetricRegistry;
+
+/**
+ * MetricRegistries is collection of MetricRegistry's. MetricsRegistries
implementations should do
+ * ref-counting of MetricRegistry's via create() and remove() methods.
+ */
+public abstract class MetricRegistries {
+
+ private static final class LazyHolder {
+ private static final MetricRegistries GLOBAL =
MetricRegistriesLoader.load();
+ }
+
+ /**
+ * Return the global singleton instance for the MetricRegistries.
+ * @return MetricRegistries implementation.
+ */
+ public static MetricRegistries global() {
+ return LazyHolder.GLOBAL;
+ }
+
+ /**
+ * Removes all the MetricRegisties.
+ */
+ public abstract void clear();
+
+ /**
+ * Create or return MetricRegistry with the given info. MetricRegistry will
only be created
+ * if current reference count is 0. Otherwise ref counted is incremented,
and an existing instance
+ * will be returned.
+ * @param info the info object for the MetricRegistrytry.
+ * @return created or existing MetricRegistry.
+ */
+ public abstract RatisMetricRegistry create(MetricRegistryInfo info);
+
+ /**
+ * Decrements the ref count of the MetricRegistry, and removes if ref count
== 0.
+ * @param key the info object for the MetricRegistrytry.
+ * @return true if metric registry is removed.
+ */
+ public abstract boolean remove(MetricRegistryInfo key);
+
+ /**
+ * Returns the MetricRegistry if found.
+ * @param info the info for the registry.
+ * @return a MetricRegistry optional.
+ */
+ public abstract Optional<RatisMetricRegistry> get(MetricRegistryInfo info);
+
+ /**
+ * Returns MetricRegistryInfo's for the MetricRegistry's created.
+ * @return MetricRegistryInfo's for the MetricRegistry's created.
+ */
+ public abstract Set<MetricRegistryInfo> getMetricRegistryInfos();
+
+ /**
+ * Returns MetricRegistry's created.
+ * @return MetricRegistry's created.
+ */
+ public abstract Collection<RatisMetricRegistry> getMetricRegistries();
+}
diff --git
a/ratis-metrics/src/main/java/org/apache/ratis/metrics/MetricRegistriesLoader.java
b/ratis-metrics/src/main/java/org/apache/ratis/metrics/MetricRegistriesLoader.java
new file mode 100644
index 0000000..9e414ad
--- /dev/null
+++
b/ratis-metrics/src/main/java/org/apache/ratis/metrics/MetricRegistriesLoader.java
@@ -0,0 +1,95 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.metrics;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ServiceLoader;
+
+import
org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.ratis.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public final class MetricRegistriesLoader {
+ private static final Logger LOG =
LoggerFactory.getLogger(MetricRegistries.class);
+
+ private static final String defaultClass
+ = "org.apache.ratis.metrics.impl.MetricRegistriesImpl";
+
+ private MetricRegistriesLoader() {
+ }
+
+ /**
+ * Creates a {@link MetricRegistries} instance using the corresponding
{@link MetricRegistries}
+ * available to {@link ServiceLoader} on the classpath. If no instance is
found, then default
+ * implementation will be loaded.
+ * @return A {@link MetricRegistries} implementation.
+ */
+ public static MetricRegistries load() {
+ List<MetricRegistries> availableImplementations =
getDefinedImplemantations();
+ return load(availableImplementations);
+ }
+
+ /**
+ * Creates a {@link MetricRegistries} instance using the corresponding
{@link MetricRegistries}
+ * available to {@link ServiceLoader} on the classpath. If no instance is
found, then default
+ * implementation will be loaded.
+ * @return A {@link MetricRegistries} implementation.
+ */
+ @VisibleForTesting
+ static MetricRegistries load(List<MetricRegistries>
availableImplementations) {
+
+ if (availableImplementations.size() == 1) {
+ // One and only one instance -- what we want/expect
+ MetricRegistries impl = availableImplementations.get(0);
+ LOG.info("Loaded MetricRegistries " + impl.getClass());
+ return impl;
+ } else if (availableImplementations.isEmpty()) {
+ try {
+ return
ReflectionUtils.newInstance((Class<MetricRegistries>)Class.forName(defaultClass));
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ // Tell the user they're doing something wrong, and choose the first
impl.
+ StringBuilder sb = new StringBuilder();
+ for (MetricRegistries factory : availableImplementations) {
+ if (sb.length() > 0) {
+ sb.append(", ");
+ }
+ sb.append(factory.getClass());
+ }
+ LOG.warn("Found multiple MetricRegistries implementations: " + sb
+ + ". Using first found implementation: " +
availableImplementations.get(0));
+ return availableImplementations.get(0);
+ }
+ }
+
+ private static List<MetricRegistries> getDefinedImplemantations() {
+ ServiceLoader<MetricRegistries> loader =
ServiceLoader.load(MetricRegistries.class);
+ List<MetricRegistries> availableFactories = new ArrayList<>();
+ for (MetricRegistries impl : loader) {
+ availableFactories.add(impl);
+ }
+ return availableFactories;
+ }
+}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/metrics/RatisMetricsRegistry.java
b/ratis-metrics/src/main/java/org/apache/ratis/metrics/MetricRegistryFactory.java
similarity index 66%
copy from
ratis-common/src/main/java/org/apache/ratis/metrics/RatisMetricsRegistry.java
copy to
ratis-metrics/src/main/java/org/apache/ratis/metrics/MetricRegistryFactory.java
index c5601ed..68d3410 100644
---
a/ratis-common/src/main/java/org/apache/ratis/metrics/RatisMetricsRegistry.java
+++
b/ratis-metrics/src/main/java/org/apache/ratis/metrics/MetricRegistryFactory.java
@@ -1,4 +1,5 @@
/**
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- * <p>
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,20 +17,20 @@
* limitations under the License.
*/
+
package org.apache.ratis.metrics;
-import com.codahale.metrics.JmxReporter;
import com.codahale.metrics.MetricRegistry;
+import org.apache.ratis.metrics.impl.RatisMetricRegistry;
-public class RatisMetricsRegistry {
-
- private static final MetricRegistry metricsRegistry = new MetricRegistry();
-
- static {
-
JmxReporter.forRegistry(RatisMetricsRegistry.getRegistry()).build().start();
- }
-
- public static MetricRegistry getRegistry() {
- return metricsRegistry;
- }
+/**
+ * A Factory for creating MetricRegistries. This is the main plugin point for
metrics implementation
+ */
+public interface MetricRegistryFactory {
+ /**
+ * Create a MetricRegistry from the given MetricRegistryInfo
+ * @param info the descriptor for MetricRegistry
+ * @return a MetricRegistry implementation
+ */
+ RatisMetricRegistry create(MetricRegistryInfo info);
}
diff --git
a/ratis-metrics/src/main/java/org/apache/ratis/metrics/MetricRegistryInfo.java
b/ratis-metrics/src/main/java/org/apache/ratis/metrics/MetricRegistryInfo.java
new file mode 100644
index 0000000..e96bb8b
--- /dev/null
+++
b/ratis-metrics/src/main/java/org/apache/ratis/metrics/MetricRegistryInfo.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.metrics;
+
+
+import java.util.Objects;
+
+import com.codahale.metrics.MetricRegistry;
+
+/**
+ *
+ * This class holds the name and description and JMX related context names for
such group of
+ * metrics.
+ */
+public class MetricRegistryInfo {
+
+ protected final String metricsName;
+ protected final String metricsDescription;
+ protected final String metricsContext;
+ private final String fullName;
+
+ public MetricRegistryInfo(
+ String metricsName,
+ String metricsDescription,
+ String metricsContext) {
+ this.metricsName = metricsName;
+ this.metricsDescription = metricsDescription;
+ this.metricsContext = metricsContext;
+ this.fullName = MetricRegistry.name(metricsName,metricsContext);
+ }
+
+ /**
+ *
+ * @return The string context
+ */
+ public String getMetricsContext() {
+ return metricsContext;
+ }
+
+ /**
+ * Get the description of what this metric registry exposes.
+ */
+ public String getMetricsDescription() {
+ return metricsDescription;
+ }
+
+ /**
+ * Get the name of the metrics that are being exported by this registry.
+ */
+ public String getMetricsName() {
+ return metricsName;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof MetricRegistryInfo) {
+ return this.hashCode() == obj.hashCode();
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(metricsName, metricsDescription, metricsContext);
+ }
+
+ public String getName() {
+ return fullName;
+ }
+}
diff --git
a/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistriesImpl.java
b/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistriesImpl.java
new file mode 100644
index 0000000..e43f7b9
--- /dev/null
+++
b/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistriesImpl.java
@@ -0,0 +1,76 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.metrics.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+
+import com.codahale.metrics.MetricRegistry;
+import org.apache.ratis.metrics.MetricRegistries;
+import org.apache.ratis.metrics.MetricRegistryFactory;
+import org.apache.ratis.metrics.MetricRegistryInfo;
+
+/**
+ * Implementation of MetricRegistries that does ref-counting.
+ */
+public class MetricRegistriesImpl extends MetricRegistries {
+ private final MetricRegistryFactory factory;
+ private final RefCountingMap<MetricRegistryInfo, RatisMetricRegistry>
registries;
+
+ public MetricRegistriesImpl() {
+ this(new MetricRegistryFactoryImpl());
+ }
+
+ public MetricRegistriesImpl(MetricRegistryFactory factory) {
+ this.factory = factory;
+ this.registries = new RefCountingMap<>();
+ }
+
+ @Override
+ public RatisMetricRegistry create(MetricRegistryInfo info) {
+ return registries.put(info, () -> factory.create(info));
+ }
+
+ @Override
+ public boolean remove(MetricRegistryInfo key) {
+ return registries.remove(key) == null;
+ }
+
+ @Override
+ public Optional<RatisMetricRegistry> get(MetricRegistryInfo info) {
+ return Optional.ofNullable(registries.get(info));
+ }
+
+ @Override
+ public Collection<RatisMetricRegistry> getMetricRegistries() {
+ return Collections.unmodifiableCollection(registries.values());
+ }
+
+ @Override
+ public void clear() {
+ registries.clear();
+ }
+
+ @Override
+ public Set<MetricRegistryInfo> getMetricRegistryInfos() {
+ return Collections.unmodifiableSet(registries.keySet());
+ }
+}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/metrics/RatisMetricsRegistry.java
b/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistryFactoryImpl.java
similarity index 69%
rename from
ratis-common/src/main/java/org/apache/ratis/metrics/RatisMetricsRegistry.java
rename to
ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistryFactoryImpl.java
index c5601ed..622f1a7 100644
---
a/ratis-common/src/main/java/org/apache/ratis/metrics/RatisMetricsRegistry.java
+++
b/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistryFactoryImpl.java
@@ -1,4 +1,5 @@
/**
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,30 +7,24 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- * <p>
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.ratis.metrics.impl;
-package org.apache.ratis.metrics;
-
-import com.codahale.metrics.JmxReporter;
import com.codahale.metrics.MetricRegistry;
+import org.apache.ratis.metrics.MetricRegistryFactory;
+import org.apache.ratis.metrics.MetricRegistryInfo;
-public class RatisMetricsRegistry {
-
- private static final MetricRegistry metricsRegistry = new MetricRegistry();
-
- static {
-
JmxReporter.forRegistry(RatisMetricsRegistry.getRegistry()).build().start();
- }
-
- public static MetricRegistry getRegistry() {
- return metricsRegistry;
+public class MetricRegistryFactoryImpl implements MetricRegistryFactory {
+ @Override
+ public RatisMetricRegistry create(MetricRegistryInfo info) {
+ return new RatisMetricRegistry(info);
}
}
diff --git
a/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/RatisMetricRegistry.java
b/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/RatisMetricRegistry.java
new file mode 100644
index 0000000..41d4ad3
--- /dev/null
+++
b/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/RatisMetricRegistry.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.metrics.impl;
+
+import java.util.SortedMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.MetricRegistry.MetricSupplier;
+import com.codahale.metrics.MetricSet;
+import com.codahale.metrics.Timer;
+import org.apache.ratis.metrics.MetricRegistryInfo;
+import
org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Custom implementation of {@link MetricRegistry}.
+ */
+public class RatisMetricRegistry{
+ MetricRegistry metricRegistry = new MetricRegistry();
+
+ private final MetricRegistryInfo info;
+
+ public RatisMetricRegistry(MetricRegistryInfo info) {
+ super();
+ this.info = info;
+ }
+
+ public Timer timer(String name) {
+ return metricRegistry.timer(getMetricName(name));
+ }
+
+ public Counter counter(String name) {
+ return metricRegistry.counter(getMetricName(name));
+ }
+
+ public Gauge gauge(String name, MetricSupplier<Gauge> supplier) {
+ return metricRegistry.gauge(getMetricName(name), supplier);
+ }
+
+ public Timer timer(String name, MetricSupplier<Timer> supplier) {
+ return metricRegistry.timer(getMetricName(name), supplier);
+ }
+
+ public SortedMap<String, Gauge> getGauges(MetricFilter filter) {
+ return metricRegistry.getGauges(filter);
+ }
+
+ public Counter counter(String name, MetricSupplier<Counter> supplier) {
+ return metricRegistry.counter(getMetricName(name), supplier);
+ }
+
+ public Histogram histogram(String name) {
+ return metricRegistry.histogram(getMetricName(name));
+ }
+
+ public Meter meter(String name) {
+ return metricRegistry.meter(getMetricName(name));
+ }
+
+ public Meter meter(String name, MetricSupplier<Meter> supplier) {
+ return metricRegistry.meter(getMetricName(name), supplier);
+ }
+
+ @VisibleForTesting
+ public Metric get(String shortName) {
+ return metricRegistry.getMetrics().get(getMetricName(shortName));
+ }
+
+ private String getMetricName(String shortName) {
+ return MetricRegistry.name(info.getName(), shortName);
+ }
+
+ public <T extends Metric> T register(String name, T metric) throws
IllegalArgumentException {
+ return metricRegistry.register(getMetricName(name), metric);
+ }
+
+
+ public MetricRegistry getDropWizardMetricRegistry() {
+ return metricRegistry;
+ }
+}
diff --git
a/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/RefCountingMap.java
b/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/RefCountingMap.java
new file mode 100644
index 0000000..07112d1
--- /dev/null
+++
b/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/RefCountingMap.java
@@ -0,0 +1,86 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.metrics.impl;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * A map of K to V, but does ref counting for added and removed values. The
values are
+ * not added directly, but instead requested from the given Supplier if ref
count == 0. Each put()
+ * call will increment the ref count, and each remove() will decrement it. The
values are removed
+ * from the map iff ref count == 0.
+ */
+class RefCountingMap<K, V> {
+
+ private ConcurrentHashMap<K, Payload<V>> map = new ConcurrentHashMap<>();
+ private static class Payload<V> {
+ V v;
+ int refCount;
+ Payload(V v) {
+ this.v = v;
+ this.refCount = 1; // create with ref count = 1
+ }
+ }
+
+ V put(K k, Supplier<V> supplier) {
+ return ((Payload<V>)map.compute(k, (k1, oldValue) -> {
+ if (oldValue != null) {
+ oldValue.refCount++;
+ return oldValue;
+ } else {
+ return new Payload(supplier.get());
+ }
+ })).v;
+ }
+
+ V get(K k) {
+ Payload<V> p = map.get(k);
+ return p == null ? null : p.v;
+ }
+
+ /**
+ * Decrements the ref count of k, and removes from map if ref count == 0.
+ * @param k the key to remove
+ * @return the value associated with the specified key or null if key is
removed from map.
+ */
+ V remove(K k) {
+ Payload<V> p = map.computeIfPresent(k, (k1, v) -> --v.refCount <= 0 ? null
: v);
+ return p == null ? null : p.v;
+ }
+
+ void clear() {
+ map.clear();
+ }
+
+ Set<K> keySet() {
+ return map.keySet();
+ }
+
+ Collection<V> values() {
+ return map.values().stream().map(v -> v.v).collect(Collectors.toList());
+ }
+
+ int size() {
+ return map.size();
+ }
+}
diff --git
a/ratis-metrics/src/main/resources/META-INF/services/org.apache.hadoop.hbase.metrics.MetricRegistries
b/ratis-metrics/src/main/resources/META-INF/services/org.apache.hadoop.hbase.metrics.MetricRegistries
new file mode 100644
index 0000000..298825f
--- /dev/null
+++
b/ratis-metrics/src/main/resources/META-INF/services/org.apache.hadoop.hbase.metrics.MetricRegistries
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+org.apache.ratis.metrics.impl.MetricRegistriesImpl
diff --git
a/ratis-metrics/src/test/java/org/apache/ratis/metrics/TestMetricRegistriesLoader.java
b/ratis-metrics/src/test/java/org/apache/ratis/metrics/TestMetricRegistriesLoader.java
new file mode 100644
index 0000000..8429ded
--- /dev/null
+++
b/ratis-metrics/src/test/java/org/apache/ratis/metrics/TestMetricRegistriesLoader.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.metrics;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.mockito.Mockito.mock;
+
+import org.apache.ratis.thirdparty.com.google.common.collect.Lists;
+import org.junit.Test;
+
+
+/**
+ * Test class for {@link MetricRegistriesLoader}.
+ */
+public class TestMetricRegistriesLoader {
+
+
+ @Test
+ public void testLoadSinleInstance() {
+ MetricRegistries loader = mock(MetricRegistries.class);
+ MetricRegistries instance =
MetricRegistriesLoader.load(Lists.newArrayList(loader));
+ assertEquals(loader, instance);
+ }
+
+ @Test
+ public void testLoadMultipleInstances() {
+ MetricRegistries loader1 = mock(MetricRegistries.class);
+ MetricRegistries loader2 = mock(MetricRegistries.class);
+ MetricRegistries loader3 = mock(MetricRegistries.class);
+ MetricRegistries instance =
MetricRegistriesLoader.load(Lists.newArrayList(loader1, loader2,
+ loader3));
+
+ // the load() returns the first instance
+ assertEquals(loader1, instance);
+ assertNotEquals(loader2, instance);
+ assertNotEquals(loader3, instance);
+ }
+}
diff --git
a/ratis-metrics/src/test/java/org/apache/ratis/metrics/impl/TestRefCountingMap.java
b/ratis-metrics/src/test/java/org/apache/ratis/metrics/impl/TestRefCountingMap.java
new file mode 100644
index 0000000..a8c54d7
--- /dev/null
+++
b/ratis-metrics/src/test/java/org/apache/ratis/metrics/impl/TestRefCountingMap.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.metrics.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collection;
+import java.util.Set;
+
+import org.apache.ratis.thirdparty.com.google.common.collect.Lists;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestRefCountingMap {
+
+ private RefCountingMap<String, String> map;
+
+ @Before
+ public void setUp() {
+ map = new RefCountingMap<>();
+ }
+
+ @Test
+ public void testPutGet() {
+ map.put("foo", () -> "foovalue");
+
+ String v = map.get("foo");
+ assertNotNull(v);
+ assertEquals("foovalue", v);
+ }
+
+ @Test
+ public void testPutMulti() {
+ String v1 = map.put("foo", () -> "foovalue");
+ String v2 = map.put("foo", () -> "foovalue2");
+ String v3 = map.put("foo", () -> "foovalue3");
+
+ String v = map.get("foo");
+ assertEquals("foovalue", v);
+ assertEquals(v, v1);
+ assertEquals(v, v2);
+ assertEquals(v, v3);
+ }
+
+ @Test
+ public void testPutRemove() {
+ map.put("foo", () -> "foovalue");
+ String v = map.remove("foo");
+ assertNull(v);
+ v = map.get("foo");
+ assertNull(v);
+ }
+
+ @Test
+ public void testPutRemoveMulti() {
+ map.put("foo", () -> "foovalue");
+ map.put("foo", () -> "foovalue2");
+ map.put("foo", () -> "foovalue3");
+
+ // remove 1
+ String v = map.remove("foo");
+ assertEquals("foovalue", v);
+
+ // remove 2
+ v = map.remove("foo");
+ assertEquals("foovalue", v);
+
+ // remove 3
+ v = map.remove("foo");
+ assertNull(v);
+ v = map.get("foo");
+ assertNull(v);
+ }
+
+ @Test
+ public void testSize() {
+ assertEquals(0, map.size());
+
+ // put a key
+ map.put("foo", () -> "foovalue");
+ assertEquals(1, map.size());
+
+ // put a different key
+ map.put("bar", () -> "foovalue2");
+ assertEquals(2, map.size());
+
+ // put the same key again
+ map.put("bar", () -> "foovalue3");
+ assertEquals(2, map.size()); // map should be same size
+ }
+
+ @Test
+ public void testClear() {
+ map.put("foo", () -> "foovalue");
+ map.put("bar", () -> "foovalue2");
+ map.put("baz", () -> "foovalue3");
+
+ map.clear();
+
+ assertEquals(0, map.size());
+ }
+
+
+ @Test
+ public void testKeySet() {
+ map.put("foo", () -> "foovalue");
+ map.put("bar", () -> "foovalue2");
+ map.put("baz", () -> "foovalue3");
+
+ Set<String> keys = map.keySet();
+ assertEquals(3, keys.size());
+
+ Lists.newArrayList("foo", "bar", "baz").stream().forEach(v ->
assertTrue(keys.contains(v)));
+ }
+
+ @Test
+ public void testValues() {
+ map.put("foo", () -> "foovalue");
+ map.put("foo", () -> "foovalue2");
+ map.put("bar", () -> "foovalue3");
+ map.put("baz", () -> "foovalue4");
+
+ Collection<String> values = map.values();
+ assertEquals(3, values.size());
+
+ Lists.newArrayList("foovalue", "foovalue3", "foovalue4").stream()
+ .forEach(v -> assertTrue(values.contains(v)));
+ }
+}
diff --git a/ratis-server/pom.xml b/ratis-server/pom.xml
index 4ccb101..e78e780 100644
--- a/ratis-server/pom.xml
+++ b/ratis-server/pom.xml
@@ -70,5 +70,15 @@
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.ratis</groupId>
+ <artifactId>ratis-metrics</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.ratis</groupId>
+ <artifactId>ratis-metrics</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
new file mode 100644
index 0000000..165072a
--- /dev/null
+++
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.server.metrics;
+
+import java.util.Optional;
+
+import com.codahale.metrics.JmxReporter;
+import org.apache.ratis.metrics.MetricRegistries;
+import org.apache.ratis.metrics.MetricRegistryInfo;
+import org.apache.ratis.metrics.impl.RatisMetricRegistry;
+
+public class RatisMetrics {
+ public final static String RATIS_LOG_WORKER_METRICS_DESC = "Ratis Log worker
metrics";
+ public final static String RATIS_LOG_WORKER_METRICS_CONTEXT =
"RaftLogWorker";
+
+ public static RatisMetricRegistry createMetricRegistryForLogWorker(String
name) {
+ return create(new MetricRegistryInfo(name, RATIS_LOG_WORKER_METRICS_DESC,
+ RATIS_LOG_WORKER_METRICS_CONTEXT));
+ }
+
+ public static RatisMetricRegistry getMetricRegistryForLogWorker(String name)
{
+ return MetricRegistries.global().get(new MetricRegistryInfo(name,
RATIS_LOG_WORKER_METRICS_DESC,
+ RATIS_LOG_WORKER_METRICS_CONTEXT)).get();
+ }
+
+ private static RatisMetricRegistry create(MetricRegistryInfo info) {
+ Optional<RatisMetricRegistry> metricRegistry =
MetricRegistries.global().get(info);
+ if (metricRegistry.isPresent()) {
+ return metricRegistry.get();
+ }
+ RatisMetricRegistry registry = MetricRegistries.global().create(info);
+
JmxReporter.forRegistry(registry.getDropWizardMetricRegistry()).inDomain("RatisCore").build().start();
+ return registry;
+ }
+}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index bdec7ea..78e7191 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -17,15 +17,18 @@
*/
package org.apache.ratis.server.raftlog.segmented;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.metrics.RatisMetricsRegistry;
+import org.apache.ratis.metrics.impl.RatisMetricRegistry;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.TimeoutIOException;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerConstants;
import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.server.metrics.RatisMetrics;
import org.apache.ratis.server.storage.RaftStorage;
import
org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.SegmentFileInfo;
import
org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.TruncationSegments;
@@ -52,6 +55,7 @@ class SegmentedRaftLogWorker implements Runnable {
static final Logger LOG =
LoggerFactory.getLogger(SegmentedRaftLogWorker.class);
static final TimeDuration ONE_SECOND = TimeDuration.valueOf(1,
TimeUnit.SECONDS);
+ private final RatisMetricRegistry metricRegistry;
static class StateMachineDataPolicy {
private final boolean sync;
@@ -98,7 +102,7 @@ class SegmentedRaftLogWorker implements Runnable {
private volatile SegmentedRaftLogOutputStream out;
private final Runnable submitUpdateCommitEvent;
private final StateMachine stateMachine;
- private final Supplier<Timer> logFlushTimer;
+ private final Timer logFlushTimer;
/**
* The number of entries that have been written into the
SegmentedRaftLogOutputStream but
@@ -125,12 +129,23 @@ class SegmentedRaftLogWorker implements Runnable {
this.submitUpdateCommitEvent = submitUpdateCommitEvent;
this.stateMachine = stateMachine;
-
+ this.metricRegistry =
RatisMetrics.createMetricRegistryForLogWorker(selfId.toString());
this.storage = storage;
-
final SizeInBytes queueByteLimit =
RaftServerConfigKeys.Log.queueByteLimit(properties);
final int queueElementLimit =
RaftServerConfigKeys.Log.queueElementLimit(properties);
- this.queue = new DataBlockingQueue<>(name, queueByteLimit,
queueElementLimit, Task::getSerializedSize);
+ this.queue =
+ new DataBlockingQueue<>(name, queueByteLimit, queueElementLimit,
Task::getSerializedSize);
+
+ metricRegistry.gauge("dataQueueSize", new MetricRegistry.MetricSupplier() {
+ @Override public Metric newMetric() {
+ return new Gauge<Integer>() {
+ @Override public Integer getValue() {
+ //q.size() is O(1) operation
+ return queue.size();
+ }
+ };
+ }
+ });
this.segmentMaxSize =
RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
this.preallocatedSize =
RaftServerConfigKeys.Log.preallocatedSize(properties).getSize();
@@ -142,8 +157,7 @@ class SegmentedRaftLogWorker implements Runnable {
this.workerThread = new Thread(this, name);
// Server Id can be null in unit tests
- this.logFlushTimer = JavaUtils.memoize(() ->
RatisMetricsRegistry.getRegistry()
- .timer(MetricRegistry.name(SegmentedRaftLogWorker.class,
selfId.toString(), "flush-time")));
+ this.logFlushTimer = metricRegistry.timer("flush-time");
}
void start(long latestIndex, File openSegmentFile) throws IOException {
@@ -262,7 +276,7 @@ class SegmentedRaftLogWorker implements Runnable {
private void flushWrites() throws IOException {
if (out != null) {
LOG.debug("{}: flush {}", name, out);
- final Timer.Context timerContext = logFlushTimer.get().time();
+ final Timer.Context timerContext = logFlushTimer.time();
try {
final CompletableFuture<Void> f = stateMachine != null ?
stateMachine.flushStateMachineData(lastWrittenIndex) :
diff --git
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
index 8a09bd4..705876e 100644
---
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
+++
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
@@ -18,6 +18,7 @@
package org.apache.ratis.statemachine.impl;
+import com.codahale.metrics.Timer;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
@@ -197,4 +198,19 @@ public class BaseStateMachine implements StateMachine {
return getClass().getSimpleName() + ":"
+ (!server.isDone()? "uninitialized": getId() + ":" + groupId);
}
+
+
+ protected CompletableFuture<Message> recordTime(Timer timer, Task task) {
+ final Timer.Context timerContext = timer.time();
+ try {
+ return task.run();
+ } finally {
+ timerContext.stop();
+ }
+ }
+
+ protected interface Task {
+ CompletableFuture<Message> run();
+ }
+
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
index 29fe962..07283b0 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
@@ -17,7 +17,9 @@
*/
package org.apache.ratis.server.storage;
+import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.server.metrics.RatisMetrics;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.RaftLogIOException;
@@ -26,6 +28,11 @@ import org.apache.ratis.util.AutoCloseableLock;
import java.util.function.Consumer;
public interface RaftStorageTestUtils {
+
+ static String getLogFlushTimeMetric(RaftPeerId serverId) {
+ return serverId + "." + RatisMetrics.RATIS_LOG_WORKER_METRICS_CONTEXT +
".flush-time";
+ }
+
static void printLog(RaftLog log, Consumer<String> println) {
if (log == null) {
println.accept("log == null");
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
index 81ad927..6660896 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
@@ -23,10 +23,10 @@ import org.apache.ratis.BaseTest;
import org.apache.ratis.MiniRaftCluster;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.metrics.RatisMetricsRegistry;
import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.metrics.RatisMetrics;
import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc;
-import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils;
+import org.apache.ratis.server.storage.RaftStorageTestUtils;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.util.JavaUtils;
@@ -99,20 +99,21 @@ public class TestRaftLogMetrics extends BaseTest
}
static void assertFlushCount(RaftServerImpl server) throws Exception {
- final String flushTimeMetric =
SegmentedRaftLogTestUtils.getLogFlushTimeMetric(server.getId());
- Timer tm =
RatisMetricsRegistry.getRegistry().getTimers().get(flushTimeMetric);
- Assert.assertNotNull(tm);
-
- final MetricsStateMachine stateMachine = MetricsStateMachine.get(server);
- final int expectedFlush = stateMachine.getFlushCount();
-
- Assert.assertEquals(expectedFlush, tm.getCount());
- Assert.assertTrue(tm.getMeanRate() > 0);
-
- // Test jmx
- ObjectName oname = new ObjectName("metrics", "name", flushTimeMetric);
- Assert.assertEquals(expectedFlush,
- ((Long)
ManagementFactory.getPlatformMBeanServer().getAttribute(oname, "Count"))
- .intValue());
+ final String flushTimeMetric =
RaftStorageTestUtils.getLogFlushTimeMetric(server.getId());
+ Timer tm = (Timer)
RatisMetrics.getMetricRegistryForLogWorker(server.getId().toString())
+ .get("flush-time");
+ Assert.assertNotNull(tm);
+
+ final MetricsStateMachine stateMachine = MetricsStateMachine.get(server);
+ final int expectedFlush = stateMachine.getFlushCount();
+
+ Assert.assertEquals(expectedFlush, tm.getCount());
+ Assert.assertTrue(tm.getMeanRate() > 0);
+
+ // Test jmx
+ ObjectName oname = new ObjectName("RatisCore", "name", flushTimeMetric);
+ Assert.assertEquals(expectedFlush,
+ ((Long) ManagementFactory.getPlatformMBeanServer().getAttribute(oname,
"Count"))
+ .intValue());
}
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/util/TestDataBlockingQueue.java
b/ratis-test/src/test/java/org/apache/ratis/util/TestDataBlockingQueue.java
index d7fc520..cfd45d5 100644
--- a/ratis-test/src/test/java/org/apache/ratis/util/TestDataBlockingQueue.java
+++ b/ratis-test/src/test/java/org/apache/ratis/util/TestDataBlockingQueue.java
@@ -30,7 +30,8 @@ public class TestDataBlockingQueue {
final SizeInBytes byteLimit = SizeInBytes.valueOf(100);
final int elementLimit = 10;
- final DataBlockingQueue<Integer> q = new DataBlockingQueue<>(null,
byteLimit, elementLimit, Integer::intValue);
+ final DataBlockingQueue<Integer> q =
+ new DataBlockingQueue<>(null, byteLimit, elementLimit,
Integer::intValue);
final TimeDuration slow = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
final TimeDuration fast = TimeDuration.valueOf(10, TimeUnit.MILLISECONDS);