This is an automated email from the ASF dual-hosted git repository.

elserj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new b98fac8  RATIS-549 RATIS-550 RATIS-552 RATIS-553 Metric Framework for 
Ratis
b98fac8 is described below

commit b98fac8804193408b316f8af5983feb557da0cba
Author: Ankit Singhal <[email protected]>
AuthorDate: Mon May 13 19:03:41 2019 -0700

    RATIS-549 RATIS-550 RATIS-552 RATIS-553 Metric Framework for Ratis
    
    Closes #19
    
    Signed-off-by: Josh Elser <[email protected]>
---
 pom.xml                                            |  15 ++-
 ratis-assembly/pom.xml                             |   4 +
 ratis-common/pom.xml                               |   5 -
 .../java/org/apache/ratis/util/AtomicUtils.java    |  60 +++++++++
 .../org/apache/ratis/util/CollectionUtils.java     |  21 +++
 .../main/java/org/apache/ratis/util/DataQueue.java |   7 +-
 .../ratis/logservice/client/LogServiceClient.java  |   2 +-
 .../metrics/LogServiceMetricsRegistry.java         |  69 ++++++++++
 .../ratis/logservice/server/LogStateMachine.java   |  88 +++++++++---
 .../ratis/logservice/server/MetaStateMachine.java  |  58 +++++---
 .../ratis/logservice/LogServiceReadWriteBase.java  |  68 +++++++++-
 .../ratis/logservice/server/TestMetaServer.java    |  62 +++++++--
 {ratis-server => ratis-metrics}/pom.xml            |  44 +++---
 .../org/apache/ratis/metrics/MetricRegistries.java |  86 ++++++++++++
 .../ratis/metrics/MetricRegistriesLoader.java      |  95 +++++++++++++
 .../ratis/metrics/MetricRegistryFactory.java       |  29 ++--
 .../apache/ratis/metrics/MetricRegistryInfo.java   |  87 ++++++++++++
 .../ratis/metrics/impl/MetricRegistriesImpl.java   |  76 +++++++++++
 .../metrics/impl/MetricRegistryFactoryImpl.java    |  25 ++--
 .../ratis/metrics/impl/RatisMetricRegistry.java    | 103 +++++++++++++++
 .../apache/ratis/metrics/impl/RefCountingMap.java  |  86 ++++++++++++
 ...rg.apache.hadoop.hbase.metrics.MetricRegistries |  18 +++
 .../ratis/metrics/TestMetricRegistriesLoader.java  |  54 ++++++++
 .../ratis/metrics/impl/TestRefCountingMap.java     | 147 +++++++++++++++++++++
 ratis-server/pom.xml                               |  10 ++
 .../apache/ratis/server/metrics/RatisMetrics.java  |  51 +++++++
 .../raftlog/segmented/SegmentedRaftLogWorker.java  |  30 +++--
 .../ratis/statemachine/impl/BaseStateMachine.java  |  16 +++
 .../ratis/server/storage/RaftStorageTestUtils.java |   7 +
 .../ratis/server/raftlog/TestRaftLogMetrics.java   |  35 ++---
 .../apache/ratis/util/TestDataBlockingQueue.java   |   3 +-
 31 files changed, 1314 insertions(+), 147 deletions(-)

diff --git a/pom.xml b/pom.xml
index ca7fcb4..9ec29e6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -56,6 +56,7 @@
     <module>ratis-examples</module>
     <module>ratis-replicated-map</module>
     <module>ratis-logservice</module>
+    <module>ratis-metrics</module>
   </modules>
 
   <pluginRepositories>
@@ -235,7 +236,18 @@
         <type>test-jar</type>
         <scope>test</scope>
       </dependency>
-
+      <dependency>
+       <artifactId>ratis-metrics</artifactId>
+       <groupId>org.apache.ratis</groupId>
+       <version>${project.version}</version>
+      </dependency>
+      <dependency>
+       <artifactId>ratis-metrics</artifactId>
+       <groupId>org.apache.ratis</groupId>
+       <version>${project.version}</version>
+       <type>test-jar</type>
+       <scope>test</scope>
+      </dependency>
       <dependency>
         <artifactId>ratis-grpc</artifactId>
         <groupId>org.apache.ratis</groupId>
@@ -248,7 +260,6 @@
         <type>test-jar</type>
         <scope>test</scope>
       </dependency>
-
       <dependency>
         <artifactId>ratis-hadoop</artifactId>
         <groupId>org.apache.ratis</groupId>
diff --git a/ratis-assembly/pom.xml b/ratis-assembly/pom.xml
index 0c243de..990e6fd 100644
--- a/ratis-assembly/pom.xml
+++ b/ratis-assembly/pom.xml
@@ -241,6 +241,10 @@
       <groupId>org.apache.ratis</groupId>
       <type>test-jar</type>
     </dependency>
+    <dependency>
+      <groupId>org.apache.ratis</groupId>
+      <artifactId>ratis-metrics</artifactId>
+    </dependency>
   </dependencies>
 
   <profiles>
diff --git a/ratis-common/pom.xml b/ratis-common/pom.xml
index 11d813d..e97d05a 100644
--- a/ratis-common/pom.xml
+++ b/ratis-common/pom.xml
@@ -53,11 +53,6 @@
       <scope>test</scope>
     </dependency>
 
-    <dependency>
-      <groupId>io.dropwizard.metrics</groupId>
-      <artifactId>metrics-core</artifactId>
-    </dependency>
-
   </dependencies>
 
   <profiles>
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/AtomicUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/AtomicUtils.java
new file mode 100644
index 0000000..264366f
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/AtomicUtils.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Utilities related to atomic operations.
+ */
+public class AtomicUtils {
+  /**
+   * Updates a AtomicLong which is supposed to maintain the minimum values. 
This method is not
+   * synchronized but is thread-safe.
+   */
+  public static void updateMin(AtomicLong min, long value) {
+    while (true) {
+      long cur = min.get();
+      if (value >= cur) {
+        break;
+      }
+
+      if (min.compareAndSet(cur, value)) {
+        break;
+      }
+    }
+  }
+
+  /**
+   * Updates a AtomicLong which is supposed to maintain the maximum values. 
This method is not
+   * synchronized but is thread-safe.
+   */
+  public static void updateMax(AtomicLong max, long value) {
+    while (true) {
+      long cur = max.get();
+      if (value <= cur) {
+        break;
+      }
+
+      if (max.compareAndSet(cur, value)) {
+        break;
+      }
+    }
+  }
+
+}
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
index 39d9908..f7100c4 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
@@ -18,6 +18,7 @@
 package org.apache.ratis.util;
 
 import java.util.*;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -106,4 +107,24 @@ public interface CollectionUtils {
     Preconditions.assertTrue(removed,
         () -> "Entry not found for key " + key + " in map " + name.get());
   }
+
+  public static <K, V> V computeIfAbsent(ConcurrentMap<K, V> map, K key, 
Supplier<V> supplier,
+      Runnable actionIfAbsent) {
+    V v = map.get(key);
+    if (v != null) {
+      return v;
+    }
+    V newValue = supplier.get();
+    v = map.putIfAbsent(key, newValue);
+    if (v != null) {
+      return v;
+    }
+    actionIfAbsent.run();
+    return newValue;
+  }
+
+  public static <K, V> V computeIfAbsent(ConcurrentMap<K, V> map, K key, 
Supplier<V> supplier) {
+    return computeIfAbsent(map, key, supplier, () -> {
+    });
+  }
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java 
b/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
index 8d4ec99..c0cc07e 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
@@ -53,7 +53,8 @@ public class DataQueue<E> {
 
   private int numBytes = 0;
 
-  public DataQueue(Object name, SizeInBytes byteLimit, int elementLimit, 
ToIntFunction<E> getNumBytes) {
+  public DataQueue(Object name, SizeInBytes byteLimit, int elementLimit,
+      ToIntFunction<E> getNumBytes) {
     this.name = name != null? name: this;
     this.byteLimit = byteLimit.getSizeInt();
     this.elementLimit = elementLimit;
@@ -149,4 +150,8 @@ public class DataQueue<E> {
     }
     return polled;
   }
+
+  public int size(){
+    return q.size();
+  }
 }
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/client/LogServiceClient.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/client/LogServiceClient.java
index 1e42ab4..7db7833 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/client/LogServiceClient.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/client/LogServiceClient.java
@@ -166,7 +166,7 @@ public class LogServiceClient implements AutoCloseable {
     }
 
     /**
-     * Moves the {@link LogStream} identified by the {@code name} from {@link 
State.OPEN} to {@link State.CLOSED}.
+     * Moves the {@link LogStream} identified by the {@code name} from {@link 
State#OPEN} to {@link State#CLOSED}.
      * If the log is not {@link State#OPEN}, this method returns an error.
      *
      * @param name The name of the log to close
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/metrics/LogServiceMetricsRegistry.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/metrics/LogServiceMetricsRegistry.java
new file mode 100644
index 0000000..8e6acfb
--- /dev/null
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/metrics/LogServiceMetricsRegistry.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.logservice.metrics;
+
+import java.util.Optional;
+
+import com.codahale.metrics.JmxReporter;
+import org.apache.ratis.metrics.MetricRegistries;
+import org.apache.ratis.metrics.MetricRegistryInfo;
+import org.apache.ratis.metrics.impl.RatisMetricRegistry;
+
+public class LogServiceMetricsRegistry {
+  public static final String RATIS_LOG_SERVICE_METRICS_CONTEXT = 
"RatisLogService";
+  public static final String RATIS_LOG_SERVICE_METRICS_DESC = "Ratis log 
service metrics";
+  public static final String RATIS_LOG_SERVICE_META_DATA_METRICS_CONTEXT =
+      "RatisLogServiceMetaData";
+  public static final String RATIS_LOG_SERVICE_META_DATA_METRICS_DESC =
+      "Ratis log service metadata metrics";
+  public static final String JMX_DOMAIN = "RatisLogService";
+
+  public static RatisMetricRegistry createMetricRegistryForLogService(String 
logName) {
+    return create(new MetricRegistryInfo(logName, 
RATIS_LOG_SERVICE_METRICS_DESC,
+        RATIS_LOG_SERVICE_METRICS_CONTEXT));
+  }
+
+  public static RatisMetricRegistry getMetricRegistryForLogService(String 
logName) {
+    return MetricRegistries.global().get(
+        new MetricRegistryInfo(logName, RATIS_LOG_SERVICE_METRICS_DESC,
+            RATIS_LOG_SERVICE_METRICS_CONTEXT)).get();
+  }
+
+  public static RatisMetricRegistry 
createMetricRegistryForLogServiceMetaData(String className) {
+    return create(new MetricRegistryInfo(className, 
RATIS_LOG_SERVICE_META_DATA_METRICS_DESC,
+        RATIS_LOG_SERVICE_META_DATA_METRICS_CONTEXT));
+  }
+
+  public static RatisMetricRegistry 
getMetricRegistryForLogServiceMetaData(String className) {
+    return MetricRegistries.global().get(
+        new MetricRegistryInfo(className, 
RATIS_LOG_SERVICE_META_DATA_METRICS_DESC,
+            RATIS_LOG_SERVICE_META_DATA_METRICS_CONTEXT)).get();
+  }
+
+  private static RatisMetricRegistry create(MetricRegistryInfo info) {
+    Optional<RatisMetricRegistry> metricRegistry = 
MetricRegistries.global().get(info);
+    if (metricRegistry.isPresent()) {
+      return metricRegistry.get();
+    }
+    RatisMetricRegistry registry = MetricRegistries.global().create(info);
+    
JmxReporter.forRegistry(registry.getDropWizardMetricRegistry()).inDomain(JMX_DOMAIN).build().start();
+    return registry;
+  }
+
+}
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
index aafb16a..7405daa 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
@@ -30,7 +30,9 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import org.apache.ratis.logservice.api.LogName;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+import org.apache.ratis.logservice.metrics.LogServiceMetricsRegistry;
 import 
org.apache.ratis.logservice.proto.LogServiceProtos.AppendLogEntryRequestProto;
 import org.apache.ratis.logservice.proto.LogServiceProtos.CloseLogReplyProto;
 import org.apache.ratis.logservice.proto.LogServiceProtos.CloseLogRequestProto;
@@ -40,6 +42,7 @@ import 
org.apache.ratis.logservice.proto.LogServiceProtos.GetStateRequestProto;
 import 
org.apache.ratis.logservice.proto.LogServiceProtos.LogServiceRequestProto;
 import org.apache.ratis.logservice.proto.LogServiceProtos.ReadLogRequestProto;
 import org.apache.ratis.logservice.util.LogServiceProtoUtil;
+import org.apache.ratis.metrics.impl.RatisMetricRegistry;
 import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.protocol.Message;
@@ -63,6 +66,16 @@ import org.slf4j.LoggerFactory;
 
 public class LogStateMachine extends BaseStateMachine {
   public static final Logger LOG = 
LoggerFactory.getLogger(LogStateMachine.class);
+  private RatisMetricRegistry metricRegistry;
+  private Timer sizeRequestTimer;
+  private Timer readNextQueryTimer;
+  private Timer getStateTimer;
+  private Timer lastIndexQueryTimer;
+  private Timer lengthQueryTimer;
+  private Timer startIndexTimer;
+  private Timer appendRequestTimer;
+  private Timer syncRequesTimer;
+  private Timer getCloseLogTimer;
 
   public static enum State {
     OPEN, CLOSED
@@ -114,6 +127,18 @@ public class LogStateMachine extends BaseStateMachine {
     this.storage.init(raftStorage);
     this.proxy = (RaftServerProxy) server;
     this.groupId = groupId;
+    //TODO: using groupId for metric now but better to tag it with LogName
+    this.metricRegistry =
+        
LogServiceMetricsRegistry.createMetricRegistryForLogService(groupId.toString());
+    this.readNextQueryTimer = metricRegistry.timer("readNextQueryTime");
+    this.startIndexTimer= metricRegistry.timer("startIndexTime");
+    this.sizeRequestTimer = metricRegistry.timer("sizeRequestTime");
+    this.getStateTimer = metricRegistry.timer("getStateTime");
+    this.lastIndexQueryTimer = metricRegistry.timer("lastIndexQueryTime");
+    this.lengthQueryTimer = metricRegistry.timer("lengthQueryTime");
+    this.syncRequesTimer = metricRegistry.timer("syncRequesTime");
+    this.appendRequestTimer = metricRegistry.timer("appendRequestTime");
+    this.getCloseLogTimer = metricRegistry.timer("getCloseLogTime");
     loadSnapshot(storage.getLatestSnapshot());
   }
 
@@ -206,17 +231,41 @@ public class LogStateMachine extends BaseStateMachine {
       switch (logServiceRequestProto.getRequestCase()) {
 
         case READNEXTQUERY:
-          return processReadRequest(logServiceRequestProto);
+          return recordTime(readNextQueryTimer, new Task(){
+            @Override public CompletableFuture<Message> run() {
+              return processReadRequest(logServiceRequestProto);
+            }
+          });
         case SIZEREQUEST:
-          return processGetSizeRequest(logServiceRequestProto);
+          return recordTime(sizeRequestTimer, new Task(){
+            @Override public CompletableFuture<Message> run() {
+              return processGetSizeRequest(logServiceRequestProto);
+            }
+          });
         case STARTINDEXQUERY:
-          return processGetStartIndexRequest(logServiceRequestProto);
+          return recordTime(startIndexTimer, new Task(){
+            @Override public CompletableFuture<Message> run() {
+              return processGetStartIndexRequest(logServiceRequestProto);
+            }
+          });
         case GETSTATE:
-          return processGetStateRequest(logServiceRequestProto);
+          return recordTime(getStateTimer, new Task(){
+            @Override public CompletableFuture<Message> run() {
+              return processGetStateRequest(logServiceRequestProto);
+            }
+          });
         case LASTINDEXQUERY:
-          return processGetLastCommittedIndexRequest(logServiceRequestProto);
+          return recordTime(lastIndexQueryTimer, new Task(){
+            @Override public CompletableFuture<Message> run() {
+              return 
processGetLastCommittedIndexRequest(logServiceRequestProto);
+            }
+          });
         case LENGTHQUERY:
-          return processGetLengthRequest(logServiceRequestProto);
+          return recordTime(lengthQueryTimer, new Task(){
+            @Override public CompletableFuture<Message> run() {
+              return processGetLengthRequest(logServiceRequestProto);
+            }
+          });
         default:
           // TODO
           throw new RuntimeException(
@@ -232,7 +281,7 @@ public class LogStateMachine extends BaseStateMachine {
 
   /**
    * Process get start index request
-   * @param msg message
+   * @param proto message
    * @return reply message
    */
   private CompletableFuture<Message>
@@ -247,7 +296,7 @@ public class LogStateMachine extends BaseStateMachine {
 
   /**
    * Process get last committed record index
-   * @param msg message
+   * @param proto message
    * @return reply message
    */
   private CompletableFuture<Message>
@@ -262,7 +311,7 @@ public class LogStateMachine extends BaseStateMachine {
 
   /**
    * Process get length request
-   * @param msg message
+   * @param proto message
    * @return reply message
    */
   private CompletableFuture<Message> 
processGetSizeRequest(LogServiceRequestProto proto) {
@@ -282,7 +331,7 @@ public class LogStateMachine extends BaseStateMachine {
   }
   /**
    * Process read log entries request
-   * @param msg message
+   * @param proto message
    * @return reply message
    */
   private CompletableFuture<Message> processReadRequest(LogServiceRequestProto 
proto) {
@@ -377,11 +426,20 @@ public class LogStateMachine extends BaseStateMachine {
           
LogServiceRequestProto.parseFrom(entry.getStateMachineLogEntry().getLogData());
       switch (logServiceRequestProto.getRequestCase()) {
         case CLOSELOG:
-          return processCloseLog(logServiceRequestProto);
+          return recordTime(getCloseLogTimer, new Task(){
+            @Override public CompletableFuture<Message> run() {
+              return processCloseLog(logServiceRequestProto);
+            }});
         case APPENDREQUEST:
-          return processAppendRequest(trx, logServiceRequestProto);
+          return recordTime(appendRequestTimer, new Task(){
+              @Override public CompletableFuture<Message> run() {
+                return processAppendRequest(trx, logServiceRequestProto);
+              }});
         case SYNCREQUEST:
-          return processSyncRequest(trx, logServiceRequestProto);
+          return recordTime(syncRequesTimer, new Task(){
+            @Override public CompletableFuture<Message> run() {
+              return processSyncRequest(trx, logServiceRequestProto);
+            }});
         default:
           //TODO
           return null;
@@ -396,7 +454,6 @@ public class LogStateMachine extends BaseStateMachine {
 
   private CompletableFuture<Message> processCloseLog(LogServiceRequestProto 
logServiceRequestProto) {
     CloseLogRequestProto closeLog = logServiceRequestProto.getCloseLog();
-    LogName logName = LogServiceProtoUtil.toLogName(closeLog.getLogName());
     // Need to check whether the file is opened if opened close it.
     // TODO need to handle exceptions while operating with files.
     return CompletableFuture.completedFuture(Message
@@ -408,7 +465,6 @@ public class LogStateMachine extends BaseStateMachine {
   private CompletableFuture<Message> processGetStateRequest(
       LogServiceRequestProto logServiceRequestProto) {
     GetStateRequestProto getState = logServiceRequestProto.getGetState();
-    LogName logName = LogServiceProtoUtil.toLogName(getState.getLogName());
     return 
CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil
         .toGetStateReplyProto(state == State.OPEN).toByteString()));
   }
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
index 24f5263..3a255ad 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
@@ -32,6 +32,8 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.StreamSupport;
 
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.logservice.api.LogInfo;
@@ -39,6 +41,7 @@ import org.apache.ratis.logservice.api.LogName;
 import org.apache.ratis.logservice.common.LogAlreadyExistException;
 import org.apache.ratis.logservice.common.LogNotFoundException;
 import org.apache.ratis.logservice.common.NoEnoughWorkersException;
+import org.apache.ratis.logservice.metrics.LogServiceMetricsRegistry;
 import org.apache.ratis.logservice.proto.MetaServiceProtos;
 import 
org.apache.ratis.logservice.proto.MetaServiceProtos.ArchiveLogReplyProto;
 import 
org.apache.ratis.logservice.proto.MetaServiceProtos.ArchiveLogRequestProto;
@@ -50,6 +53,7 @@ import 
org.apache.ratis.logservice.proto.MetaServiceProtos.LogServiceUnregisterL
 import org.apache.ratis.logservice.proto.MetaServiceProtos.MetaSMRequestProto;
 import org.apache.ratis.logservice.util.LogServiceProtoUtil;
 import org.apache.ratis.logservice.util.MetaServiceProtoUtil;
+import org.apache.ratis.metrics.impl.RatisMetricRegistry;
 import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.Message;
@@ -100,6 +104,7 @@ public class MetaStateMachine extends BaseStateMachine {
 
     private RaftGroupId metadataGroupId;
     private RaftGroupId logServerGroupId;
+    private RatisMetricRegistry metricRegistry;
 
     public MetaStateMachine(RaftGroupId metadataGroupId, RaftGroupId 
logServerGroupId) {
       this.metadataGroupId = metadataGroupId;
@@ -109,6 +114,8 @@ public class MetaStateMachine extends BaseStateMachine {
     @Override
     public void initialize(RaftServer server, RaftGroupId groupId, RaftStorage 
storage) throws IOException {
         this.raftServer = server;
+        this.metricRegistry = LogServiceMetricsRegistry
+            
.createMetricRegistryForLogServiceMetaData(getClass().getSimpleName());
         super.initialize(server, groupId, storage);
     }
 
@@ -172,27 +179,31 @@ public class MetaStateMachine extends BaseStateMachine {
 
     @Override
     public CompletableFuture<Message> query(Message request) {
-        if (currentGroup == null) {
-            try {
-                List<RaftGroup> x = 
StreamSupport.stream(raftServer.getGroups().spliterator(), false)
-                    .filter(group -> 
group.getGroupId().equals(metadataGroupId))
-                    .collect(Collectors.toList());
-                if (x.size() == 1) {
-                    currentGroup = x.get(0);
+        Timer.Context timerContext = null;
+        MetaServiceProtos.MetaServiceRequestProto.TypeCase type = null;
+        try {
+            if (currentGroup == null) {
+                try {
+                    List<RaftGroup> x =
+                        
StreamSupport.stream(raftServer.getGroups().spliterator(), false)
+                            .filter(group -> 
group.getGroupId().equals(metadataGroupId)).collect(Collectors.toList());
+                    if (x.size() == 1) {
+                        currentGroup = x.get(0);
+                    }
+                } catch (IOException e) {
+                    e.printStackTrace();
                 }
-            } catch (IOException e) {
+            }
+            RaftProperties properties = new RaftProperties();
+            MetaServiceProtos.MetaServiceRequestProto req = null;
+            try {
+                req = 
MetaServiceProtos.MetaServiceRequestProto.parseFrom(request.getContent());
+            } catch (InvalidProtocolBufferException e) {
                 e.printStackTrace();
             }
-        }
-        RaftProperties properties = new RaftProperties();
-        MetaServiceProtos.MetaServiceRequestProto req = null;
-        try {
-            req =  
MetaServiceProtos.MetaServiceRequestProto.parseFrom(request.getContent());
-        } catch (InvalidProtocolBufferException e) {
-            e.printStackTrace();
-        }
-        MetaServiceProtos.MetaServiceRequestProto.TypeCase type = 
req.getTypeCase();
-        switch (type) {
+            type = req.getTypeCase();
+            timerContext = metricRegistry.timer(type.name()).time();
+            switch (type) {
 
             case CREATELOG:
                 return processCreateLogRequest(req);
@@ -204,10 +215,15 @@ public class MetaStateMachine extends BaseStateMachine {
                 return processArchiveLog(req);
             case DELETELOG:
                 return processDeleteLog(req);
-                default:
+            default:
+            }
+            CompletableFuture<Message> reply = super.query(request);
+            return reply;
+        }finally{
+            if (timerContext != null) {
+                timerContext.stop();
+            }
         }
-        CompletableFuture<Message> reply = super.query(request);
-        return reply;
     }
 
 
diff --git 
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
 
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
index e378068..166702e 100644
--- 
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
+++ 
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -29,6 +30,8 @@ import java.util.stream.LongStream;
 import java.util.Iterator;
 import java.util.List;
 
+import javax.management.ObjectName;
+
 import org.apache.ratis.BaseTest;
 import org.apache.ratis.MiniRaftCluster;
 import org.apache.ratis.RaftTestUtil;
@@ -40,6 +43,7 @@ import org.apache.ratis.logservice.api.LogStream;
 import org.apache.ratis.logservice.api.LogStream.State;
 import org.apache.ratis.logservice.api.LogWriter;
 import org.apache.ratis.logservice.impl.LogStreamImpl;
+import org.apache.ratis.logservice.metrics.LogServiceMetricsRegistry;
 import org.apache.ratis.logservice.server.LogStateMachine;
 import org.apache.ratis.logservice.util.TestUtils;
 import org.apache.ratis.statemachine.StateMachine;
@@ -64,6 +68,46 @@ public abstract class LogServiceReadWriteBase<CLUSTER 
extends MiniRaftCluster>
   static final int NUM_PEERS = 3;
   CLUSTER cluster;
 
+  class MetricLogStream extends LogStreamImpl{
+    Long startRecordIdCount = 0l;
+    Long getStateCount = 0l;
+    Long getLastRecordIdCount = 0l;
+    Long getLengthCount = 0l;
+    Long getSizeCount = 0l;
+
+    public MetricLogStream(LogName name, RaftClient raftClient) {
+      super(name, raftClient);
+    }
+
+    @Override public long getStartRecordId() throws IOException {
+      startRecordIdCount++;
+      return super.getStartRecordId();
+    }
+
+    @Override public State getState() {
+      getStateCount++;
+      return super.getState();
+    }
+
+    @Override public long getLastRecordId() throws IOException {
+      getLastRecordIdCount++;
+      return super.getLastRecordId();
+    }
+
+    @Override public long getLength() throws IOException {
+      getLengthCount++;
+      return super.getLength();
+    }
+
+    @Override public long getSize() throws IOException {
+      getSizeCount++;
+      return super.getSize();
+    }
+
+    @Override public LogName getName() {
+      return super.getName();
+    }
+  }
   @Before
   public void setUpCluster() throws IOException, InterruptedException {
     cluster = newCluster(NUM_PEERS);
@@ -78,11 +122,12 @@ public abstract class LogServiceReadWriteBase<CLUSTER 
extends MiniRaftCluster>
             .build();
     LogName logName = LogName.of("log1");
     // TODO need API to circumvent metadata service for testing
-    try (LogStream logStream = new LogStreamImpl(logName, raftClient)) {
+    try (LogStream logStream = new MetricLogStream(logName, raftClient)) {
       assertEquals("log1", logStream.getName().getName());
       assertEquals(State.OPEN, logStream.getState());
       assertEquals(0, logStream.getSize());
       assertEquals(0, logStream.getLength());
+      testJMXMetrics(logStream);
 
       LogReader reader = logStream.createReader();
       LogWriter writer = logStream.createWriter();
@@ -120,6 +165,7 @@ public abstract class LogServiceReadWriteBase<CLUSTER 
extends MiniRaftCluster>
         ByteBuffer actual = actualIter.next();
         assertEquals(expected, actual);
       }
+      testJMXMetrics(logStream);
     }
   }
 
@@ -131,7 +177,7 @@ public abstract class LogServiceReadWriteBase<CLUSTER 
extends MiniRaftCluster>
     final LogName logName = LogName.of("log1");
     final int numRecords = 25;
     // TODO need API to circumvent metadata service for testing
-    try (LogStream logStream = new LogStreamImpl(logName, raftClient)) {
+    try (LogStream logStream = new MetricLogStream(logName, raftClient)) {
       try (LogWriter writer = logStream.createWriter()) {
         LOG.info("Writing {} records", numRecords);
         // Write records 0 through 99 (inclusive)
@@ -172,7 +218,7 @@ public abstract class LogServiceReadWriteBase<CLUSTER 
extends MiniRaftCluster>
     final LogName logName = LogName.of("log1");
     final int numRecords = 100;
     // TODO need API to circumvent metadata service for testing
-    try (LogStream logStream = new LogStreamImpl(logName, raftClient)) {
+    try (LogStream logStream = new MetricLogStream(logName, raftClient)) {
       try (LogWriter writer = logStream.createWriter()) {
         LOG.info("Writing {} records", numRecords);
         // Write records 0 through 99 (inclusive)
@@ -208,7 +254,7 @@ public abstract class LogServiceReadWriteBase<CLUSTER 
extends MiniRaftCluster>
             .build();
     final LogName logName = LogName.of("log1");
     final int numRecords = 10;
-    try (LogStream logStream = new LogStreamImpl(logName, raftClient)) {
+    try (LogStream logStream = new MetricLogStream(logName, raftClient)) {
       final List<Long> recordIds;
       try (LogWriter writer = logStream.createWriter()) {
         LOG.info("Writing {} records", numRecords);
@@ -258,4 +304,18 @@ public abstract class LogServiceReadWriteBase<CLUSTER 
extends MiniRaftCluster>
     System.arraycopy(bb.array(), bb.arrayOffset(), bytes, 0, bb.remaining());
     return Integer.parseInt(new String(bytes, StandardCharsets.UTF_8));
   }
+
+  private void testJMXMetrics(LogStream logStream) throws Exception {
+    assertEquals(((MetricLogStream) logStream).getLengthCount,
+        getJMXCount(cluster.getGroup().getGroupId().toString(), 
"lengthQueryTime"));
+    assertEquals(((MetricLogStream) logStream).getSizeCount,
+        getJMXCount(cluster.getGroup().getGroupId().toString(), 
"sizeRequestTime"));
+
+  }
+
+  private Long getJMXCount(String groupId, String metricName) throws Exception 
{
+    ObjectName oname = new ObjectName(LogServiceMetricsRegistry.JMX_DOMAIN, 
"name",
+        groupId + "." + 
LogServiceMetricsRegistry.RATIS_LOG_SERVICE_METRICS_CONTEXT + "." + metricName);
+    return (Long) 
ManagementFactory.getPlatformMBeanServer().getAttribute(oname, "Count");
+  }
 }
diff --git 
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
 
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
index 47aa60a..7dab2f5 100644
--- 
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
+++ 
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
@@ -22,7 +22,8 @@ import org.apache.ratis.logservice.api.*;
 import org.apache.ratis.logservice.client.LogServiceClient;
 import org.apache.ratis.logservice.common.LogAlreadyExistException;
 import org.apache.ratis.logservice.common.LogNotFoundException;
-import org.apache.ratis.logservice.server.LogServer;
+import org.apache.ratis.logservice.metrics.LogServiceMetricsRegistry;
+import org.apache.ratis.logservice.proto.MetaServiceProtos;
 import org.apache.ratis.logservice.util.LogServiceCluster;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.RaftServerProxy;
@@ -32,8 +33,10 @@ import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
 
 import static org.junit.Assert.assertEquals;
@@ -41,11 +44,30 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 
+import javax.management.ObjectName;
 
 public class TestMetaServer {
 
     static LogServiceCluster cluster = null;
+    static AtomicInteger createCount = new AtomicInteger();
+    static AtomicInteger deleteCount = new AtomicInteger();
+    static AtomicInteger listCount = new AtomicInteger();
+    LogServiceClient client = new LogServiceClient(cluster.getMetaIdentity()){
+        @Override public LogStream createLog(LogName logName) throws 
IOException {
+            createCount.incrementAndGet();
+            return super.createLog(logName);
+        }
+
+        @Override public void deleteLog(LogName logName) throws IOException {
+            deleteCount.incrementAndGet();
+            super.deleteLog(logName);
+        }
 
+        @Override public List<LogInfo> listLogs() throws IOException {
+            listCount.incrementAndGet();
+            return super.listLogs();
+        }
+    };
     @BeforeClass
     public static void beforeClass() {
         cluster = new LogServiceCluster(3);
@@ -66,19 +88,19 @@ public class TestMetaServer {
      * @throws IOException
      */
     @Test
-    public void testCreateAndGetLog() throws IOException {
-        LogServiceClient client = new 
LogServiceClient(cluster.getMetaIdentity());
+    public void testCreateAndGetLog() throws Exception {
+
         // This should be LogServiceStream ?
         LogStream logStream1 = client.createLog(LogName.of("testCreateLog"));
         assertNotNull(logStream1);
         LogStream logStream2 = client.getLog(LogName.of("testCreateLog"));
+        
testJMXCount(MetaServiceProtos.MetaServiceRequestProto.TypeCase.GETLOG.name(),1l);
         assertNotNull(logStream2);
     }
 
 
     @Test
     public void testReadWritetoLog() throws IOException, InterruptedException {
-        LogServiceClient client = new 
LogServiceClient(cluster.getMetaIdentity());
         LogStream stream = client.createLog(LogName.of("testReadWrite"));
         LogWriter writer = stream.createWriter();
         ByteBuffer testMessage =  ByteBuffer.wrap("Hello world!".getBytes());
@@ -107,12 +129,13 @@ public class TestMetaServer {
      */
 
     @Test
-    public void testDeleteLog() throws IOException {
-        LogServiceClient client = new 
LogServiceClient(cluster.getMetaIdentity());
+    public void testDeleteLog() throws Exception {
         // This should be LogServiceStream ?
         LogStream logStream1 = client.createLog(LogName.of("testDeleteLog"));
         assertNotNull(logStream1);
         client.deleteLog(LogName.of("testDeleteLog"));
+        
testJMXCount(MetaServiceProtos.MetaServiceRequestProto.TypeCase.DELETELOG.name(),
+            (long) deleteCount.get());
         try {
           logStream1 = client.getLog(LogName.of("testDeleteLog"));
             fail("Failed to throw LogNotFoundException");
@@ -128,7 +151,6 @@ public class TestMetaServer {
      */
     @Test
     public void testGetNotExistingLog() {
-        LogServiceClient client = new 
LogServiceClient(cluster.getMetaIdentity());
         try {
             LogStream log = client.getLog(LogName.of("no_such_log"));
             fail("LogNotFoundException was not thrown");
@@ -142,8 +164,7 @@ public class TestMetaServer {
      * @throws IOException
      */
     @Test
-    public void testAlreadyExistLog() throws IOException {
-        LogServiceClient client = new 
LogServiceClient(cluster.getMetaIdentity());
+    public void testAlreadyExistLog() throws Exception {
         LogStream logStream1 = client.createLog(LogName.of("test1"));
         assertNotNull(logStream1);
         try {
@@ -159,8 +180,7 @@ public class TestMetaServer {
      * @throws IOException
      */
     @Test
-    public void testListLogs() throws IOException {
-        LogServiceClient client = new 
LogServiceClient(cluster.getMetaIdentity());
+    public void testListLogs() throws Exception {
         client.createLog(LogName.of("listLogTest1"));
         client.createLog(LogName.of("listLogTest2"));
         client.createLog(LogName.of("listLogTest3"));
@@ -168,15 +188,31 @@ public class TestMetaServer {
         client.createLog(LogName.of("listLogTest5"));
         client.createLog(LogName.of("listLogTest6"));
         client.createLog(LogName.of("listLogTest7"));
+        // Test jmx
+
         List<LogInfo> list = client.listLogs();
+        
testJMXCount(MetaServiceProtos.MetaServiceRequestProto.TypeCase.CREATELOG.name(),
+            (long) createCount.get() );
+        
testJMXCount(MetaServiceProtos.MetaServiceRequestProto.TypeCase.LISTLOGS.name(),listCount.longValue());
         assert(list.stream().filter(log -> 
log.getLogName().getName().startsWith("listLogTest")).count() == 7);
 
     }
 
+    private void testJMXCount(String metricName, Long expectedCount) throws 
Exception {
+        assertEquals(expectedCount, getJMXCount(metricName));
+    }
+
+    private Long getJMXCount(String metricName) throws Exception{
+        ObjectName oname = new 
ObjectName(LogServiceMetricsRegistry.JMX_DOMAIN, "name",
+            MetaStateMachine.class.getSimpleName() + "."
+                + 
LogServiceMetricsRegistry.RATIS_LOG_SERVICE_META_DATA_METRICS_CONTEXT + "."
+                + metricName);
+        return (Long) 
ManagementFactory.getPlatformMBeanServer().getAttribute(oname, "Count");
+    }
+
     @Ignore ("Too heavy for the current implementation")
     @Test
-    public void testFinalClieanUp() throws IOException {
-        LogServiceClient client = new 
LogServiceClient(cluster.getMetaIdentity());
+    public void testFinalClieanUp() throws Exception {
         IntStream.range(0, 10).forEach(i -> {
             try {
                 client.createLog(LogName.of("CleanTest" + i));
diff --git a/ratis-server/pom.xml b/ratis-metrics/pom.xml
similarity index 78%
copy from ratis-server/pom.xml
copy to ratis-metrics/pom.xml
index 4ccb101..fde261b 100644
--- a/ratis-server/pom.xml
+++ b/ratis-metrics/pom.xml
@@ -20,55 +20,41 @@
     <version>0.4.0-SNAPSHOT</version>
   </parent>
 
-  <artifactId>ratis-server</artifactId>
-  <name>Apache Ratis Server</name>
+  <artifactId>ratis-metrics</artifactId>
+  <name>Apache Ratis Metrics</name>
 
   <dependencies>
     <dependency>
-      <groupId>org.apache.ratis</groupId>
-      <artifactId>ratis-thirdparty-misc</artifactId>
-    </dependency>
-    <dependency>
       <artifactId>ratis-proto</artifactId>
       <groupId>org.apache.ratis</groupId>
     </dependency>
-
     <dependency>
-      <artifactId>ratis-common</artifactId>
-      <groupId>org.apache.ratis</groupId>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
     </dependency>
     <dependency>
-      <artifactId>ratis-common</artifactId>
-      <groupId>org.apache.ratis</groupId>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
       <scope>test</scope>
-      <type>test-jar</type>
-    </dependency>
-
-    <dependency>
-      <artifactId>ratis-client</artifactId>
-      <groupId>org.apache.ratis</groupId>
     </dependency>
     <dependency>
-      <artifactId>ratis-client</artifactId>
-      <groupId>org.apache.ratis</groupId>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
       <scope>test</scope>
-      <type>test-jar</type>
     </dependency>
-
     <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
+      <artifactId>ratis-common</artifactId>
+       <groupId>org.apache.ratis</groupId>
     </dependency>
-
     <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>test</scope>
+      <groupId>io.dropwizard.metrics</groupId>
+      <artifactId>metrics-core</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.mockito</groupId>
-      <artifactId>mockito-all</artifactId>
+      <artifactId>ratis-common</artifactId>
+      <groupId>org.apache.ratis</groupId>
       <scope>test</scope>
+      <type>test-jar</type>
     </dependency>
   </dependencies>
 </project>
diff --git 
a/ratis-metrics/src/main/java/org/apache/ratis/metrics/MetricRegistries.java 
b/ratis-metrics/src/main/java/org/apache/ratis/metrics/MetricRegistries.java
new file mode 100644
index 0000000..97c81cf
--- /dev/null
+++ b/ratis-metrics/src/main/java/org/apache/ratis/metrics/MetricRegistries.java
@@ -0,0 +1,86 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.metrics;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Set;
+
+import com.codahale.metrics.MetricRegistry;
+import org.apache.ratis.metrics.impl.RatisMetricRegistry;
+
+/**
+ * MetricRegistries is collection of MetricRegistry's. MetricsRegistries 
implementations should do
+ * ref-counting of MetricRegistry's via create() and remove() methods.
+ */
+public abstract class MetricRegistries {
+
+  private static final class LazyHolder {
+    private static final MetricRegistries GLOBAL = 
MetricRegistriesLoader.load();
+  }
+
+  /**
+   * Return the global singleton instance for the MetricRegistries.
+   * @return MetricRegistries implementation.
+   */
+  public static MetricRegistries global() {
+    return LazyHolder.GLOBAL;
+  }
+
+  /**
+   * Removes all the MetricRegisties.
+   */
+  public abstract void clear();
+
+  /**
+   * Create or return MetricRegistry with the given info. MetricRegistry will 
only be created
+   * if current reference count is 0. Otherwise ref counted is incremented, 
and an existing instance
+   * will be returned.
+   * @param info the info object for the MetricRegistrytry.
+   * @return created or existing MetricRegistry.
+   */
+  public abstract RatisMetricRegistry create(MetricRegistryInfo info);
+
+  /**
+   * Decrements the ref count of the MetricRegistry, and removes if ref count 
== 0.
+   * @param key the info object for the MetricRegistrytry.
+   * @return true if metric registry is removed.
+   */
+  public abstract boolean remove(MetricRegistryInfo key);
+
+  /**
+   * Returns the MetricRegistry if found.
+   * @param info the info for the registry.
+   * @return a MetricRegistry optional.
+   */
+  public abstract Optional<RatisMetricRegistry> get(MetricRegistryInfo info);
+
+  /**
+   * Returns MetricRegistryInfo's for the MetricRegistry's created.
+   * @return MetricRegistryInfo's for the MetricRegistry's created.
+   */
+  public abstract Set<MetricRegistryInfo> getMetricRegistryInfos();
+
+  /**
+   * Returns MetricRegistry's created.
+   * @return MetricRegistry's created.
+   */
+  public abstract Collection<RatisMetricRegistry> getMetricRegistries();
+}
diff --git 
a/ratis-metrics/src/main/java/org/apache/ratis/metrics/MetricRegistriesLoader.java
 
b/ratis-metrics/src/main/java/org/apache/ratis/metrics/MetricRegistriesLoader.java
new file mode 100644
index 0000000..9e414ad
--- /dev/null
+++ 
b/ratis-metrics/src/main/java/org/apache/ratis/metrics/MetricRegistriesLoader.java
@@ -0,0 +1,95 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.metrics;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ServiceLoader;
+
+import 
org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.ratis.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public final class MetricRegistriesLoader {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MetricRegistries.class);
+
+  private static final String defaultClass
+      = "org.apache.ratis.metrics.impl.MetricRegistriesImpl";
+
+  private MetricRegistriesLoader() {
+  }
+
+  /**
+   * Creates a {@link MetricRegistries} instance using the corresponding 
{@link MetricRegistries}
+   * available to {@link ServiceLoader} on the classpath. If no instance is 
found, then default
+   * implementation will be loaded.
+   * @return A {@link MetricRegistries} implementation.
+   */
+  public static MetricRegistries load() {
+    List<MetricRegistries> availableImplementations = 
getDefinedImplemantations();
+    return load(availableImplementations);
+  }
+
+  /**
+   * Creates a {@link MetricRegistries} instance using the corresponding 
{@link MetricRegistries}
+   * available to {@link ServiceLoader} on the classpath. If no instance is 
found, then default
+   * implementation will be loaded.
+   * @return A {@link MetricRegistries} implementation.
+   */
+  @VisibleForTesting
+  static MetricRegistries load(List<MetricRegistries> 
availableImplementations) {
+
+    if (availableImplementations.size() == 1) {
+      // One and only one instance -- what we want/expect
+      MetricRegistries impl = availableImplementations.get(0);
+      LOG.info("Loaded MetricRegistries " + impl.getClass());
+      return impl;
+    } else if (availableImplementations.isEmpty()) {
+      try {
+        return 
ReflectionUtils.newInstance((Class<MetricRegistries>)Class.forName(defaultClass));
+      } catch (ClassNotFoundException e) {
+        throw new RuntimeException(e);
+      }
+    } else {
+      // Tell the user they're doing something wrong, and choose the first 
impl.
+      StringBuilder sb = new StringBuilder();
+      for (MetricRegistries factory : availableImplementations) {
+        if (sb.length() > 0) {
+          sb.append(", ");
+        }
+        sb.append(factory.getClass());
+      }
+      LOG.warn("Found multiple MetricRegistries implementations: " + sb
+          + ". Using first found implementation: " + 
availableImplementations.get(0));
+      return availableImplementations.get(0);
+    }
+  }
+
+  private static List<MetricRegistries> getDefinedImplemantations() {
+    ServiceLoader<MetricRegistries> loader = 
ServiceLoader.load(MetricRegistries.class);
+    List<MetricRegistries> availableFactories = new ArrayList<>();
+    for (MetricRegistries impl : loader) {
+      availableFactories.add(impl);
+    }
+    return availableFactories;
+  }
+}
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/metrics/RatisMetricsRegistry.java 
b/ratis-metrics/src/main/java/org/apache/ratis/metrics/MetricRegistryFactory.java
similarity index 66%
copy from 
ratis-common/src/main/java/org/apache/ratis/metrics/RatisMetricsRegistry.java
copy to 
ratis-metrics/src/main/java/org/apache/ratis/metrics/MetricRegistryFactory.java
index c5601ed..68d3410 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/metrics/RatisMetricsRegistry.java
+++ 
b/ratis-metrics/src/main/java/org/apache/ratis/metrics/MetricRegistryFactory.java
@@ -1,4 +1,5 @@
 /**
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p>
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,20 +17,20 @@
  * limitations under the License.
  */
 
+
 package org.apache.ratis.metrics;
 
-import com.codahale.metrics.JmxReporter;
 import com.codahale.metrics.MetricRegistry;
+import org.apache.ratis.metrics.impl.RatisMetricRegistry;
 
-public class RatisMetricsRegistry {
-
-  private static final MetricRegistry metricsRegistry = new MetricRegistry();
-
-  static {
-    
JmxReporter.forRegistry(RatisMetricsRegistry.getRegistry()).build().start();
-  }
-
-  public static MetricRegistry getRegistry() {
-    return metricsRegistry;
-  }
+/**
+ * A Factory for creating MetricRegistries. This is the main plugin point for 
metrics implementation
+ */
+public interface MetricRegistryFactory {
+  /**
+   * Create a MetricRegistry from the given MetricRegistryInfo
+   * @param info the descriptor for MetricRegistry
+   * @return a MetricRegistry implementation
+   */
+  RatisMetricRegistry create(MetricRegistryInfo info);
 }
diff --git 
a/ratis-metrics/src/main/java/org/apache/ratis/metrics/MetricRegistryInfo.java 
b/ratis-metrics/src/main/java/org/apache/ratis/metrics/MetricRegistryInfo.java
new file mode 100644
index 0000000..e96bb8b
--- /dev/null
+++ 
b/ratis-metrics/src/main/java/org/apache/ratis/metrics/MetricRegistryInfo.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.metrics;
+
+
+import java.util.Objects;
+
+import com.codahale.metrics.MetricRegistry;
+
+/**
+ *
+ * This class holds the name and description and JMX related context names for 
such group of
+ * metrics.
+ */
+public class MetricRegistryInfo {
+
+  protected final String metricsName;
+  protected final String metricsDescription;
+  protected final String metricsContext;
+  private final String fullName;
+
+  public MetricRegistryInfo(
+      String metricsName,
+      String metricsDescription,
+      String metricsContext) {
+    this.metricsName = metricsName;
+    this.metricsDescription = metricsDescription;
+    this.metricsContext = metricsContext;
+    this.fullName = MetricRegistry.name(metricsName,metricsContext);
+  }
+
+  /**
+   *
+   * @return The string context
+   */
+  public String getMetricsContext() {
+    return metricsContext;
+  }
+
+  /**
+   * Get the description of what this metric registry exposes.
+   */
+  public String getMetricsDescription() {
+    return metricsDescription;
+  }
+
+  /**
+   * Get the name of the metrics that are being exported by this registry.
+   */
+  public String getMetricsName() {
+    return metricsName;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof MetricRegistryInfo) {
+      return this.hashCode() == obj.hashCode();
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(metricsName, metricsDescription, metricsContext);
+  }
+
+  public String getName() {
+    return fullName;
+  }
+}
diff --git 
a/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistriesImpl.java
 
b/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistriesImpl.java
new file mode 100644
index 0000000..e43f7b9
--- /dev/null
+++ 
b/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistriesImpl.java
@@ -0,0 +1,76 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.metrics.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+
+import com.codahale.metrics.MetricRegistry;
+import org.apache.ratis.metrics.MetricRegistries;
+import org.apache.ratis.metrics.MetricRegistryFactory;
+import org.apache.ratis.metrics.MetricRegistryInfo;
+
+/**
+ * Implementation of MetricRegistries that does ref-counting.
+ */
+public class MetricRegistriesImpl extends MetricRegistries {
+  private final MetricRegistryFactory factory;
+  private final RefCountingMap<MetricRegistryInfo, RatisMetricRegistry> 
registries;
+
+  public MetricRegistriesImpl() {
+    this(new MetricRegistryFactoryImpl());
+  }
+
+  public MetricRegistriesImpl(MetricRegistryFactory factory) {
+    this.factory = factory;
+    this.registries = new RefCountingMap<>();
+  }
+
+  @Override
+  public RatisMetricRegistry create(MetricRegistryInfo info) {
+    return registries.put(info, () -> factory.create(info));
+  }
+
+  @Override
+  public boolean remove(MetricRegistryInfo key) {
+    return registries.remove(key) == null;
+  }
+
+  @Override
+  public Optional<RatisMetricRegistry> get(MetricRegistryInfo info) {
+    return Optional.ofNullable(registries.get(info));
+  }
+
+  @Override
+  public Collection<RatisMetricRegistry> getMetricRegistries() {
+    return Collections.unmodifiableCollection(registries.values());
+  }
+
+  @Override
+  public void clear() {
+    registries.clear();
+  }
+
+  @Override
+  public Set<MetricRegistryInfo> getMetricRegistryInfos() {
+    return Collections.unmodifiableSet(registries.keySet());
+  }
+}
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/metrics/RatisMetricsRegistry.java 
b/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistryFactoryImpl.java
similarity index 69%
rename from 
ratis-common/src/main/java/org/apache/ratis/metrics/RatisMetricsRegistry.java
rename to 
ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistryFactoryImpl.java
index c5601ed..622f1a7 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/metrics/RatisMetricsRegistry.java
+++ 
b/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/MetricRegistryFactoryImpl.java
@@ -1,4 +1,5 @@
 /**
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,30 +7,24 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p>
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.ratis.metrics.impl;
 
-package org.apache.ratis.metrics;
-
-import com.codahale.metrics.JmxReporter;
 import com.codahale.metrics.MetricRegistry;
+import org.apache.ratis.metrics.MetricRegistryFactory;
+import org.apache.ratis.metrics.MetricRegistryInfo;
 
-public class RatisMetricsRegistry {
-
-  private static final MetricRegistry metricsRegistry = new MetricRegistry();
-
-  static {
-    
JmxReporter.forRegistry(RatisMetricsRegistry.getRegistry()).build().start();
-  }
-
-  public static MetricRegistry getRegistry() {
-    return metricsRegistry;
+public class MetricRegistryFactoryImpl implements MetricRegistryFactory {
+  @Override
+  public RatisMetricRegistry create(MetricRegistryInfo info) {
+    return new RatisMetricRegistry(info);
   }
 }
diff --git 
a/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/RatisMetricRegistry.java
 
b/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/RatisMetricRegistry.java
new file mode 100644
index 0000000..41d4ad3
--- /dev/null
+++ 
b/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/RatisMetricRegistry.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.metrics.impl;
+
+import java.util.SortedMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.MetricRegistry.MetricSupplier;
+import com.codahale.metrics.MetricSet;
+import com.codahale.metrics.Timer;
+import org.apache.ratis.metrics.MetricRegistryInfo;
+import 
org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Custom implementation of {@link MetricRegistry}.
+ */
+public class RatisMetricRegistry{
+  MetricRegistry metricRegistry = new MetricRegistry();
+
+  private final MetricRegistryInfo info;
+
+  public RatisMetricRegistry(MetricRegistryInfo info) {
+    super();
+    this.info = info;
+  }
+
+  public Timer timer(String name) {
+    return metricRegistry.timer(getMetricName(name));
+  }
+
+  public Counter counter(String name) {
+    return metricRegistry.counter(getMetricName(name));
+  }
+
+  public Gauge gauge(String name, MetricSupplier<Gauge> supplier) {
+    return metricRegistry.gauge(getMetricName(name), supplier);
+  }
+
+  public Timer timer(String name, MetricSupplier<Timer> supplier) {
+    return metricRegistry.timer(getMetricName(name), supplier);
+  }
+
+  public SortedMap<String, Gauge> getGauges(MetricFilter filter) {
+    return metricRegistry.getGauges(filter);
+  }
+
+  public Counter counter(String name, MetricSupplier<Counter> supplier) {
+    return metricRegistry.counter(getMetricName(name), supplier);
+  }
+
+  public Histogram histogram(String name) {
+    return metricRegistry.histogram(getMetricName(name));
+  }
+
+   public Meter meter(String name) {
+    return metricRegistry.meter(getMetricName(name));
+  }
+
+  public Meter meter(String name, MetricSupplier<Meter> supplier) {
+    return metricRegistry.meter(getMetricName(name), supplier);
+  }
+
+  @VisibleForTesting
+  public Metric get(String shortName) {
+    return metricRegistry.getMetrics().get(getMetricName(shortName));
+  }
+
+  private String getMetricName(String shortName) {
+    return MetricRegistry.name(info.getName(), shortName);
+  }
+
+  public <T extends Metric> T register(String name, T metric) throws 
IllegalArgumentException {
+    return metricRegistry.register(getMetricName(name), metric);
+  }
+
+
+  public MetricRegistry getDropWizardMetricRegistry() {
+    return metricRegistry;
+  }
+}
diff --git 
a/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/RefCountingMap.java 
b/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/RefCountingMap.java
new file mode 100644
index 0000000..07112d1
--- /dev/null
+++ 
b/ratis-metrics/src/main/java/org/apache/ratis/metrics/impl/RefCountingMap.java
@@ -0,0 +1,86 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.metrics.impl;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * A map of K to V, but does ref counting for added and removed values. The 
values are
+ * not added directly, but instead requested from the given Supplier if ref 
count == 0. Each put()
+ * call will increment the ref count, and each remove() will decrement it. The 
values are removed
+ * from the map iff ref count == 0.
+ */
+class RefCountingMap<K, V> {
+
+  private ConcurrentHashMap<K, Payload<V>> map = new ConcurrentHashMap<>();
+  private static class Payload<V> {
+    V v;
+    int refCount;
+    Payload(V v) {
+      this.v = v;
+      this.refCount = 1; // create with ref count = 1
+    }
+  }
+
+  V put(K k, Supplier<V> supplier) {
+    return ((Payload<V>)map.compute(k, (k1, oldValue) -> {
+      if (oldValue != null) {
+        oldValue.refCount++;
+        return oldValue;
+      } else {
+        return new Payload(supplier.get());
+      }
+    })).v;
+  }
+
+  V get(K k) {
+    Payload<V> p = map.get(k);
+    return p == null ? null : p.v;
+  }
+
+  /**
+   * Decrements the ref count of k, and removes from map if ref count == 0.
+   * @param k the key to remove
+   * @return the value associated with the specified key or null if key is 
removed from map.
+   */
+  V remove(K k) {
+    Payload<V> p = map.computeIfPresent(k, (k1, v) -> --v.refCount <= 0 ? null 
: v);
+    return p == null ? null : p.v;
+  }
+
+  void clear() {
+    map.clear();
+  }
+
+  Set<K> keySet() {
+    return map.keySet();
+  }
+
+  Collection<V> values() {
+    return map.values().stream().map(v -> v.v).collect(Collectors.toList());
+  }
+
+  int size() {
+    return map.size();
+  }
+}
diff --git 
a/ratis-metrics/src/main/resources/META-INF/services/org.apache.hadoop.hbase.metrics.MetricRegistries
 
b/ratis-metrics/src/main/resources/META-INF/services/org.apache.hadoop.hbase.metrics.MetricRegistries
new file mode 100644
index 0000000..298825f
--- /dev/null
+++ 
b/ratis-metrics/src/main/resources/META-INF/services/org.apache.hadoop.hbase.metrics.MetricRegistries
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+org.apache.ratis.metrics.impl.MetricRegistriesImpl
diff --git 
a/ratis-metrics/src/test/java/org/apache/ratis/metrics/TestMetricRegistriesLoader.java
 
b/ratis-metrics/src/test/java/org/apache/ratis/metrics/TestMetricRegistriesLoader.java
new file mode 100644
index 0000000..8429ded
--- /dev/null
+++ 
b/ratis-metrics/src/test/java/org/apache/ratis/metrics/TestMetricRegistriesLoader.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.metrics;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.mockito.Mockito.mock;
+
+import org.apache.ratis.thirdparty.com.google.common.collect.Lists;
+import org.junit.Test;
+
+
+/**
+ * Test class for {@link MetricRegistriesLoader}.
+ */
+public class TestMetricRegistriesLoader {
+
+
+  @Test
+  public void testLoadSinleInstance() {
+    MetricRegistries loader = mock(MetricRegistries.class);
+    MetricRegistries instance = 
MetricRegistriesLoader.load(Lists.newArrayList(loader));
+    assertEquals(loader, instance);
+  }
+
+  @Test
+  public void testLoadMultipleInstances() {
+    MetricRegistries loader1 = mock(MetricRegistries.class);
+    MetricRegistries loader2 = mock(MetricRegistries.class);
+    MetricRegistries loader3 = mock(MetricRegistries.class);
+    MetricRegistries instance = 
MetricRegistriesLoader.load(Lists.newArrayList(loader1, loader2,
+        loader3));
+
+    // the load() returns the first instance
+    assertEquals(loader1, instance);
+    assertNotEquals(loader2, instance);
+    assertNotEquals(loader3, instance);
+  }
+}
diff --git 
a/ratis-metrics/src/test/java/org/apache/ratis/metrics/impl/TestRefCountingMap.java
 
b/ratis-metrics/src/test/java/org/apache/ratis/metrics/impl/TestRefCountingMap.java
new file mode 100644
index 0000000..a8c54d7
--- /dev/null
+++ 
b/ratis-metrics/src/test/java/org/apache/ratis/metrics/impl/TestRefCountingMap.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.metrics.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collection;
+import java.util.Set;
+
+import org.apache.ratis.thirdparty.com.google.common.collect.Lists;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestRefCountingMap {
+
+  private RefCountingMap<String, String> map;
+
+  @Before
+  public void setUp() {
+    map = new RefCountingMap<>();
+  }
+
+  @Test
+  public void testPutGet() {
+    map.put("foo", () -> "foovalue");
+
+    String v = map.get("foo");
+    assertNotNull(v);
+    assertEquals("foovalue", v);
+  }
+
+  @Test
+  public void testPutMulti() {
+    String v1 = map.put("foo", () -> "foovalue");
+    String v2 =  map.put("foo", () -> "foovalue2");
+    String v3 = map.put("foo", () -> "foovalue3");
+
+    String v = map.get("foo");
+    assertEquals("foovalue", v);
+    assertEquals(v, v1);
+    assertEquals(v, v2);
+    assertEquals(v, v3);
+  }
+
+  @Test
+  public void testPutRemove() {
+    map.put("foo", () -> "foovalue");
+    String v = map.remove("foo");
+    assertNull(v);
+    v = map.get("foo");
+    assertNull(v);
+  }
+
+  @Test
+  public void testPutRemoveMulti() {
+    map.put("foo", () -> "foovalue");
+    map.put("foo", () -> "foovalue2");
+    map.put("foo", () -> "foovalue3");
+
+    // remove 1
+    String v = map.remove("foo");
+    assertEquals("foovalue", v);
+
+    // remove 2
+    v = map.remove("foo");
+    assertEquals("foovalue", v);
+
+    // remove 3
+    v = map.remove("foo");
+    assertNull(v);
+    v = map.get("foo");
+    assertNull(v);
+  }
+
+  @Test
+  public void testSize() {
+    assertEquals(0, map.size());
+
+    // put a key
+    map.put("foo", () -> "foovalue");
+    assertEquals(1, map.size());
+
+    // put a different key
+    map.put("bar", () -> "foovalue2");
+    assertEquals(2, map.size());
+
+    // put the same key again
+    map.put("bar", () -> "foovalue3");
+    assertEquals(2, map.size()); // map should be same size
+  }
+
+  @Test
+  public void testClear() {
+    map.put("foo", () -> "foovalue");
+    map.put("bar", () -> "foovalue2");
+    map.put("baz", () -> "foovalue3");
+
+    map.clear();
+
+    assertEquals(0, map.size());
+  }
+
+
+  @Test
+  public void testKeySet() {
+    map.put("foo", () -> "foovalue");
+    map.put("bar", () -> "foovalue2");
+    map.put("baz", () -> "foovalue3");
+
+    Set<String> keys = map.keySet();
+    assertEquals(3, keys.size());
+
+    Lists.newArrayList("foo", "bar", "baz").stream().forEach(v -> 
assertTrue(keys.contains(v)));
+  }
+
+  @Test
+  public void testValues() {
+    map.put("foo", () -> "foovalue");
+    map.put("foo", () -> "foovalue2");
+    map.put("bar", () -> "foovalue3");
+    map.put("baz", () -> "foovalue4");
+
+    Collection<String> values = map.values();
+    assertEquals(3, values.size());
+
+    Lists.newArrayList("foovalue", "foovalue3", "foovalue4").stream()
+            .forEach(v -> assertTrue(values.contains(v)));
+  }
+}
diff --git a/ratis-server/pom.xml b/ratis-server/pom.xml
index 4ccb101..e78e780 100644
--- a/ratis-server/pom.xml
+++ b/ratis-server/pom.xml
@@ -70,5 +70,15 @@
       <artifactId>mockito-all</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.ratis</groupId>
+      <artifactId>ratis-metrics</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.ratis</groupId>
+      <artifactId>ratis-metrics</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java 
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
new file mode 100644
index 0000000..165072a
--- /dev/null
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.server.metrics;
+
+import java.util.Optional;
+
+import com.codahale.metrics.JmxReporter;
+import org.apache.ratis.metrics.MetricRegistries;
+import org.apache.ratis.metrics.MetricRegistryInfo;
+import org.apache.ratis.metrics.impl.RatisMetricRegistry;
+
+public class RatisMetrics {
+  public final static String RATIS_LOG_WORKER_METRICS_DESC = "Ratis Log worker 
metrics";
+  public final static String RATIS_LOG_WORKER_METRICS_CONTEXT = 
"RaftLogWorker";
+
+  public static RatisMetricRegistry createMetricRegistryForLogWorker(String 
name) {
+    return create(new MetricRegistryInfo(name, RATIS_LOG_WORKER_METRICS_DESC,
+        RATIS_LOG_WORKER_METRICS_CONTEXT));
+  }
+
+  public static RatisMetricRegistry getMetricRegistryForLogWorker(String name) 
{
+    return MetricRegistries.global().get(new MetricRegistryInfo(name, 
RATIS_LOG_WORKER_METRICS_DESC,
+        RATIS_LOG_WORKER_METRICS_CONTEXT)).get();
+  }
+
+  private static RatisMetricRegistry create(MetricRegistryInfo info) {
+    Optional<RatisMetricRegistry> metricRegistry = 
MetricRegistries.global().get(info);
+    if (metricRegistry.isPresent()) {
+      return metricRegistry.get();
+    }
+    RatisMetricRegistry registry = MetricRegistries.global().create(info);
+    
JmxReporter.forRegistry(registry.getDropWizardMetricRegistry()).inDomain("RatisCore").build().start();
+    return registry;
+  }
+}
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index bdec7ea..78e7191 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -17,15 +17,18 @@
  */
 package org.apache.ratis.server.raftlog.segmented;
 
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Timer;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.metrics.RatisMetricsRegistry;
+import org.apache.ratis.metrics.impl.RatisMetricRegistry;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.TimeoutIOException;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerConstants;
 import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.server.metrics.RatisMetrics;
 import org.apache.ratis.server.storage.RaftStorage;
 import 
org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.SegmentFileInfo;
 import 
org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.TruncationSegments;
@@ -52,6 +55,7 @@ class SegmentedRaftLogWorker implements Runnable {
   static final Logger LOG = 
LoggerFactory.getLogger(SegmentedRaftLogWorker.class);
 
   static final TimeDuration ONE_SECOND = TimeDuration.valueOf(1, 
TimeUnit.SECONDS);
+  private final RatisMetricRegistry metricRegistry;
 
   static class StateMachineDataPolicy {
     private final boolean sync;
@@ -98,7 +102,7 @@ class SegmentedRaftLogWorker implements Runnable {
   private volatile SegmentedRaftLogOutputStream out;
   private final Runnable submitUpdateCommitEvent;
   private final StateMachine stateMachine;
-  private final Supplier<Timer> logFlushTimer;
+  private final Timer logFlushTimer;
 
   /**
    * The number of entries that have been written into the 
SegmentedRaftLogOutputStream but
@@ -125,12 +129,23 @@ class SegmentedRaftLogWorker implements Runnable {
 
     this.submitUpdateCommitEvent = submitUpdateCommitEvent;
     this.stateMachine = stateMachine;
-
+    this.metricRegistry = 
RatisMetrics.createMetricRegistryForLogWorker(selfId.toString());
     this.storage = storage;
-
     final SizeInBytes queueByteLimit = 
RaftServerConfigKeys.Log.queueByteLimit(properties);
     final int queueElementLimit = 
RaftServerConfigKeys.Log.queueElementLimit(properties);
-    this.queue = new DataBlockingQueue<>(name, queueByteLimit, 
queueElementLimit, Task::getSerializedSize);
+    this.queue =
+        new DataBlockingQueue<>(name, queueByteLimit, queueElementLimit, 
Task::getSerializedSize);
+
+    metricRegistry.gauge("dataQueueSize", new MetricRegistry.MetricSupplier() {
+      @Override public Metric newMetric() {
+        return new Gauge<Integer>() {
+          @Override public Integer getValue() {
+            //q.size() is O(1) operation
+            return queue.size();
+          }
+        };
+      }
+    });
 
     this.segmentMaxSize = 
RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
     this.preallocatedSize = 
RaftServerConfigKeys.Log.preallocatedSize(properties).getSize();
@@ -142,8 +157,7 @@ class SegmentedRaftLogWorker implements Runnable {
     this.workerThread = new Thread(this, name);
 
     // Server Id can be null in unit tests
-    this.logFlushTimer = JavaUtils.memoize(() -> 
RatisMetricsRegistry.getRegistry()
-        .timer(MetricRegistry.name(SegmentedRaftLogWorker.class, 
selfId.toString(), "flush-time")));
+    this.logFlushTimer = metricRegistry.timer("flush-time");
   }
 
   void start(long latestIndex, File openSegmentFile) throws IOException {
@@ -262,7 +276,7 @@ class SegmentedRaftLogWorker implements Runnable {
   private void flushWrites() throws IOException {
     if (out != null) {
       LOG.debug("{}: flush {}", name, out);
-      final Timer.Context timerContext = logFlushTimer.get().time();
+      final Timer.Context timerContext = logFlushTimer.time();
       try {
         final CompletableFuture<Void> f = stateMachine != null ?
             stateMachine.flushStateMachineData(lastWrittenIndex) :
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
index 8a09bd4..705876e 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
@@ -18,6 +18,7 @@
 
 package org.apache.ratis.statemachine.impl;
 
+import com.codahale.metrics.Timer;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftGroupId;
@@ -197,4 +198,19 @@ public class BaseStateMachine implements StateMachine {
     return getClass().getSimpleName() + ":"
         + (!server.isDone()? "uninitialized": getId() + ":" + groupId);
   }
+
+
+  protected CompletableFuture<Message> recordTime(Timer timer, Task task) {
+    final Timer.Context timerContext = timer.time();
+    try {
+      return task.run();
+    } finally {
+      timerContext.stop();
+    }
+  }
+
+  protected interface Task {
+    CompletableFuture<Message> run();
+  }
+
 }
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
index 29fe962..07283b0 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
@@ -17,7 +17,9 @@
  */
 package org.apache.ratis.server.storage;
 
+import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.server.metrics.RatisMetrics;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.raftlog.RaftLogIOException;
@@ -26,6 +28,11 @@ import org.apache.ratis.util.AutoCloseableLock;
 import java.util.function.Consumer;
 
 public interface RaftStorageTestUtils {
+
+  static String getLogFlushTimeMetric(RaftPeerId serverId) {
+    return serverId + "." + RatisMetrics.RATIS_LOG_WORKER_METRICS_CONTEXT + 
".flush-time";
+  }
+
   static void printLog(RaftLog log, Consumer<String> println) {
     if (log == null) {
       println.accept("log == null");
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
index 81ad927..6660896 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
@@ -23,10 +23,10 @@ import org.apache.ratis.BaseTest;
 import org.apache.ratis.MiniRaftCluster;
 import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.metrics.RatisMetricsRegistry;
 import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.metrics.RatisMetrics;
 import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc;
-import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils;
+import org.apache.ratis.server.storage.RaftStorageTestUtils;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
 import org.apache.ratis.util.JavaUtils;
@@ -99,20 +99,21 @@ public class TestRaftLogMetrics extends BaseTest
   }
 
   static void assertFlushCount(RaftServerImpl server) throws Exception {
-      final String flushTimeMetric = 
SegmentedRaftLogTestUtils.getLogFlushTimeMetric(server.getId());
-      Timer tm = 
RatisMetricsRegistry.getRegistry().getTimers().get(flushTimeMetric);
-      Assert.assertNotNull(tm);
-
-      final MetricsStateMachine stateMachine = MetricsStateMachine.get(server);
-      final int expectedFlush = stateMachine.getFlushCount();
-
-      Assert.assertEquals(expectedFlush, tm.getCount());
-      Assert.assertTrue(tm.getMeanRate() > 0);
-
-      // Test jmx
-      ObjectName oname = new ObjectName("metrics", "name", flushTimeMetric);
-      Assert.assertEquals(expectedFlush,
-          ((Long) 
ManagementFactory.getPlatformMBeanServer().getAttribute(oname, "Count"))
-              .intValue());
+    final String flushTimeMetric = 
RaftStorageTestUtils.getLogFlushTimeMetric(server.getId());
+    Timer tm = (Timer) 
RatisMetrics.getMetricRegistryForLogWorker(server.getId().toString())
+        .get("flush-time");
+    Assert.assertNotNull(tm);
+
+    final MetricsStateMachine stateMachine = MetricsStateMachine.get(server);
+    final int expectedFlush = stateMachine.getFlushCount();
+
+    Assert.assertEquals(expectedFlush, tm.getCount());
+    Assert.assertTrue(tm.getMeanRate() > 0);
+
+    // Test jmx
+    ObjectName oname = new ObjectName("RatisCore", "name", flushTimeMetric);
+    Assert.assertEquals(expectedFlush,
+        ((Long) ManagementFactory.getPlatformMBeanServer().getAttribute(oname, 
"Count"))
+            .intValue());
   }
 }
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/util/TestDataBlockingQueue.java 
b/ratis-test/src/test/java/org/apache/ratis/util/TestDataBlockingQueue.java
index d7fc520..cfd45d5 100644
--- a/ratis-test/src/test/java/org/apache/ratis/util/TestDataBlockingQueue.java
+++ b/ratis-test/src/test/java/org/apache/ratis/util/TestDataBlockingQueue.java
@@ -30,7 +30,8 @@ public class TestDataBlockingQueue {
 
   final SizeInBytes byteLimit = SizeInBytes.valueOf(100);
   final int elementLimit = 10;
-  final DataBlockingQueue<Integer> q = new DataBlockingQueue<>(null, 
byteLimit, elementLimit, Integer::intValue);
+  final DataBlockingQueue<Integer> q =
+      new DataBlockingQueue<>(null, byteLimit, elementLimit, 
Integer::intValue);
 
   final TimeDuration slow = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
   final TimeDuration fast = TimeDuration.valueOf(10, TimeUnit.MILLISECONDS);

Reply via email to