Repository: incubator-ratis
Updated Branches:
  refs/heads/master 77ffa18ba -> 2fb731f91


RATIS-148. Add metric for log flush latency. Contributed by jitendra.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/2fb731f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/2fb731f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/2fb731f9

Branch: refs/heads/master
Commit: 2fb731f916c9888b6d7dcc348eee05a5887a316e
Parents: 77ffa18
Author: Jitendra Pandey <[email protected]>
Authored: Tue Nov 21 17:18:56 2017 -0800
Committer: Jitendra Pandey <[email protected]>
Committed: Tue Nov 21 17:18:56 2017 -0800

----------------------------------------------------------------------
 ratis-common/pom.xml                            |   7 ++
 .../ratis/metrics/RatisMetricsRegistry.java     |  35 ++++++
 .../ratis/server/storage/RaftLogWorker.java     |  15 ++-
 .../apache/ratis/server/TestRaftLogMetrics.java | 106 +++++++++++++++++++
 4 files changed, 162 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fb731f9/ratis-common/pom.xml
----------------------------------------------------------------------
diff --git a/ratis-common/pom.xml b/ratis-common/pom.xml
index 33286d1..9f4017e 100644
--- a/ratis-common/pom.xml
+++ b/ratis-common/pom.xml
@@ -49,6 +49,13 @@
       <artifactId>mockito-all</artifactId>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>io.dropwizard.metrics</groupId>
+      <artifactId>metrics-core</artifactId>
+      <version>3.2.5</version>
+    </dependency>
+
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fb731f9/ratis-common/src/main/java/org/apache/ratis/metrics/RatisMetricsRegistry.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/metrics/RatisMetricsRegistry.java 
b/ratis-common/src/main/java/org/apache/ratis/metrics/RatisMetricsRegistry.java
new file mode 100644
index 0000000..c5601ed
--- /dev/null
+++ 
b/ratis-common/src/main/java/org/apache/ratis/metrics/RatisMetricsRegistry.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.metrics;
+
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.MetricRegistry;
+
+public class RatisMetricsRegistry {
+
+  private static final MetricRegistry metricsRegistry = new MetricRegistry();
+
+  static {
+    
JmxReporter.forRegistry(RatisMetricsRegistry.getRegistry()).build().start();
+  }
+
+  public static MetricRegistry getRegistry() {
+    return metricsRegistry;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fb731f9/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
index 23d7c9a..0c68cdc 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
@@ -17,7 +17,10 @@
  */
 package org.apache.ratis.server.storage;
 
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.metrics.RatisMetricsRegistry;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerConstants;
@@ -35,6 +38,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.util.concurrent.*;
+import java.util.function.Supplier;
 
 /**
  * This class takes the responsibility of all the raft log related I/O ops for 
a
@@ -55,6 +59,7 @@ class RaftLogWorker implements Runnable {
   private volatile LogOutputStream out;
   private final RaftServerImpl raftServer;
   private final StateMachine stateMachine;
+  private final Supplier<Timer> logFlushTimer;
 
   /**
    * The number of entries that have been written into the LogOutputStream but
@@ -89,6 +94,9 @@ class RaftLogWorker implements Runnable {
         RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt();
     this.forceSyncNum = RaftServerConfigKeys.Log.forceSyncNum(properties);
     this.workerThread = new Thread(this, name);
+    this.logFlushTimer = JavaUtils.memoize(() -> 
RatisMetricsRegistry.getRegistry()
+        .timer(MetricRegistry.name(RaftLogWorker.class, 
raftServer.getId().toString(),
+            "flush-time")));
   }
 
   void start(long latestIndex, File openSegmentFile) throws IOException {
@@ -208,7 +216,12 @@ class RaftLogWorker implements Runnable {
   private void flushWrites() throws IOException {
     if (out != null) {
       LOG.debug("flush data to " + out + ", reset pending_sync_number to 0");
-      out.flush();
+      final Timer.Context timerContext = logFlushTimer.get().time();
+      try {
+        out.flush();
+      } finally {
+        timerContext.stop();
+      }
       updateFlushedIndex();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2fb731f9/ratis-server/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java 
b/ratis-server/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java
new file mode 100644
index 0000000..978800d
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.server;
+
+import com.codahale.metrics.Timer;
+import org.apache.log4j.Level;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.metrics.RatisMetricsRegistry;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerProxy;
+import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc;
+import org.apache.ratis.util.LogUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.management.ObjectName;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+
+public class TestRaftLogMetrics {
+
+  {
+    LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+    LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
+  }
+
+  public static final int NUM_SERVERS = 3;
+
+  protected static final RaftProperties properties = new RaftProperties();
+
+  private final MiniRaftClusterWithSimulatedRpc cluster = 
MiniRaftClusterWithSimulatedRpc
+      .FACTORY.newCluster(NUM_SERVERS, getProperties());
+
+  public RaftProperties getProperties() {
+    return properties;
+  }
+
+  @Before
+  public void setup() throws IOException {
+    Assert.assertNull(cluster.getLeader());
+    cluster.start();
+  }
+
+  @After
+  public void tearDown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  private String getLogFlushTimeMetric(String serverId) {
+    return new StringBuilder("org.apache.ratis.server.storage.RaftLogWorker.")
+        .append(serverId).append(".flush-time").toString();
+  }
+
+  @Test
+  public void testFlushMetric() throws Exception {
+    int numMsg = 2;
+    final RaftTestUtil.SimpleMessage[] messages = 
RaftTestUtil.SimpleMessage.create(numMsg);
+
+    try (final RaftClient client = cluster.createClient()) {
+      for (RaftTestUtil.SimpleMessage message : messages) {
+        client.send(message);
+      }
+    }
+
+    for (RaftServerProxy rsp: cluster.getServers()) {
+      String flushTimeMetric = getLogFlushTimeMetric(rsp.getId().toString());
+      Timer tm = 
RatisMetricsRegistry.getRegistry().getTimers().get(flushTimeMetric);
+      Assert.assertNotNull(tm);
+
+      // Number of log entries expected = numMsg + 1 entry for 
start-log-segment
+      int numExpectedLogEntries = numMsg + 1;
+
+      Assert.assertEquals(numExpectedLogEntries, tm.getCount());
+      Assert.assertTrue(tm.getMeanRate() > 0);
+
+      // Test jmx
+      ObjectName oname = new ObjectName("metrics", "name", flushTimeMetric);
+      Assert.assertEquals(numExpectedLogEntries,
+          ((Long) 
ManagementFactory.getPlatformMBeanServer().getAttribute(oname, "Count"))
+              .intValue());
+    }
+  }
+
+}

Reply via email to