Author: sradia
Date: Mon Mar  1 21:36:23 2010
New Revision: 917737

URL: http://svn.apache.org/viewvc?rev=917737&view=rev
Log:
HADOOP-6599  Split existing RpcMetrics into RpcMetrics & RpcDetailedMetrics.
   (Suresh Srinivas via Sanjay Radia)


Added:
    
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcDetailedActivityMBean.java
    
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java
Modified:
    hadoop/common/trunk/CHANGES.txt
    hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Server.java
    hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java
    
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcActivityMBean.java
    hadoop/common/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
    hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java

Modified: hadoop/common/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=917737&r1=917736&r2=917737&view=diff
==============================================================================
--- hadoop/common/trunk/CHANGES.txt (original)
+++ hadoop/common/trunk/CHANGES.txt Mon Mar  1 21:36:23 2010
@@ -1,4 +1,5 @@
 Hadoop Change Log
+# Add directory level at the storage directory
 
 Trunk (unreleased changes)
 
@@ -181,6 +182,9 @@
     HADOOP-6589. Provide better error messages when RPC authentication fails.
     (Kan Zhang via omalley)
 
+   HADOOP-6599  Split existing RpcMetrics into RpcMetrics & RpcDetailedMetrics.
+   (Suresh Srinivas via Sanjay Radia)
+
   OPTIMIZATIONS
 
     HADOOP-6467. Improve the performance on HarFileSystem.listStatus(..).

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Server.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Server.java?rev=917737&r1=917736&r2=917737&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Server.java Mon Mar  1 
21:36:23 2010
@@ -64,6 +64,7 @@
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
 import org.apache.hadoop.ipc.metrics.RpcMetrics;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.SaslRpcServer;
@@ -172,7 +173,8 @@
                                                   // connections to nuke
                                                   //during a cleanup
   
-  protected RpcMetrics  rpcMetrics;
+  protected RpcMetrics rpcMetrics;
+  protected RpcDetailedMetrics rpcDetailedMetrics;
   
   private Configuration conf;
   private SecretManager<TokenIdentifier> secretManager;
@@ -1268,8 +1270,9 @@
             // its own message ordering.
             setupResponse(buf, call, (error == null) ? Status.SUCCESS
                 : Status.ERROR, value, errorClass, error);
-            // Discard the large buf and reset it back to
-            // smaller size to freeup heap
+            
+            // Discard the large buf and reset it back to smaller size 
+            // to free up heap
             if (buf.size() > maxRespSize) {
               LOG.warn("Large response size " + buf.size() + " for call "
                   + call.toString());
@@ -1336,6 +1339,8 @@
     this.port = listener.getAddress().getPort();    
     this.rpcMetrics = new RpcMetrics(serverName,
                           Integer.toString(this.port), this);
+    this.rpcDetailedMetrics = new RpcDetailedMetrics(serverName,
+                            Integer.toString(this.port));
     this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false);
 
 
@@ -1450,6 +1455,9 @@
     if (this.rpcMetrics != null) {
       this.rpcMetrics.shutdown();
     }
+    if (this.rpcDetailedMetrics != null) {
+      this.rpcDetailedMetrics.shutdown();
+    }
   }
 
   /** Wait for the server to be stopped.
@@ -1540,11 +1548,15 @@
    *
    * @see WritableByteChannel#write(ByteBuffer)
    */
-  private static int channelWrite(WritableByteChannel channel, 
-                                  ByteBuffer buffer) throws IOException {
+  private int channelWrite(WritableByteChannel channel, 
+                           ByteBuffer buffer) throws IOException {
     
-    return (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
-           channel.write(buffer) : channelIO(null, channel, buffer);
+    int count =  (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
+                 channel.write(buffer) : channelIO(null, channel, buffer);
+    if (count > 0) {
+      rpcMetrics.sentBytes.inc(count);
+    }
+    return count;
   }
   
   
@@ -1556,11 +1568,15 @@
    * 
    * @see ReadableByteChannel#read(ByteBuffer)
    */
-  private static int channelRead(ReadableByteChannel channel, 
-                                 ByteBuffer buffer) throws IOException {
+  private int channelRead(ReadableByteChannel channel, 
+                          ByteBuffer buffer) throws IOException {
     
-    return (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
-           channel.read(buffer) : channelIO(channel, null, buffer);
+    int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
+                channel.read(buffer) : channelIO(channel, null, buffer);
+    if (count > 0) {
+      rpcMetrics.receivedBytes.inc(count);
+    }
+    return count;
   }
   
   /**

Modified: 
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java?rev=917737&r1=917736&r2=917737&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java 
(original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java 
Mon Mar  1 21:36:23 2010
@@ -351,15 +351,15 @@
         rpcMetrics.rpcProcessingTime.inc(processingTime);
 
         MetricsTimeVaryingRate m =
-         (MetricsTimeVaryingRate) 
rpcMetrics.registry.get(call.getMethodName());
+         (MetricsTimeVaryingRate) 
rpcDetailedMetrics.registry.get(call.getMethodName());
        if (m == null) {
          try {
            m = new MetricsTimeVaryingRate(call.getMethodName(),
-                                               rpcMetrics.registry);
+                                               rpcDetailedMetrics.registry);
          } catch (IllegalArgumentException iae) {
            // the metrics has been registered; re-fetch the handle
            LOG.info("Error register " + call.getMethodName(), iae);
-           m = (MetricsTimeVaryingRate) rpcMetrics.registry.get(
+           m = (MetricsTimeVaryingRate) rpcDetailedMetrics.registry.get(
                call.getMethodName());
          }
        }

Modified: 
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcActivityMBean.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcActivityMBean.java?rev=917737&r1=917736&r2=917737&view=diff
==============================================================================
--- 
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcActivityMBean.java
 (original)
+++ 
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcActivityMBean.java
 Mon Mar  1 21:36:23 2010
@@ -55,7 +55,7 @@
  */
 
 public class RpcActivityMBean extends MetricsDynamicMBeanBase {
-  final private ObjectName mbeanName;
+  private final ObjectName mbeanName;
 
   /**
    * 
@@ -63,9 +63,8 @@
    * @param serviceName - the service name for the rpc service 
    * @param port - the rpc port.
    */
-  public RpcActivityMBean(final MetricsRegistry mr, final String serviceName, 
final String port) {
-
-    
+  public RpcActivityMBean(final MetricsRegistry mr, final String serviceName,
+      final String port) {
     super(mr, "Rpc layer statistics");
     mbeanName = MBeanUtil.registerMBean(serviceName,
           "RpcActivityForPort" + port, this);
@@ -76,5 +75,4 @@
     if (mbeanName != null)
       MBeanUtil.unregisterMBean(mbeanName);
   }
-
 }

Added: 
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcDetailedActivityMBean.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcDetailedActivityMBean.java?rev=917737&view=auto
==============================================================================
--- 
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcDetailedActivityMBean.java
 (added)
+++ 
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcDetailedActivityMBean.java
 Mon Mar  1 21:36:23 2010
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc.metrics;
+
+import javax.management.ObjectName;
+
+import org.apache.hadoop.metrics.util.MBeanUtil;
+import org.apache.hadoop.metrics.util.MetricsDynamicMBeanBase;
+import org.apache.hadoop.metrics.util.MetricsRegistry;
+
+/**
+ * 
+ * This is the JMX MBean for reporting the RPC layer Activity. The MBean is
+ * register using the name
+ * "hadoop:service=<RpcServiceName>,name=RpcDetailedActivityForPort<port>"
+ * 
+ * Many of the activity metrics are sampled and averaged on an interval which
+ * can be specified in the metrics config file.
+ * <p>
+ * For the metrics that are sampled and averaged, one must specify a metrics
+ * context that does periodic update calls. Most metrics contexts do. The
+ * default Null metrics context however does NOT. So if you aren't using any
+ * other metrics context then you can turn on the viewing and averaging of
+ * sampled metrics by specifying the following two lines in the
+ * hadoop-meterics.properties file:
+ * 
+ * <pre>
+ *        rpc.class=org.apache.hadoop.metrics.spi.NullContextWithUpdateThread
+ *        rpc.period=10
+ * </pre>
+ *<p>
+ * Note that the metrics are collected regardless of the context used. The
+ * context with the update thread is used to average the data periodically
+ * 
+ * Impl details: We use a dynamic mbean that gets the list of the metrics from
+ * the metrics registry passed as an argument to the constructor
+ */
+public class RpcDetailedActivityMBean extends MetricsDynamicMBeanBase {
+  private final ObjectName mbeanName;
+
+  /**
+   * @param mr - the metrics registry that has all the metrics
+   * @param serviceName - the service name for the rpc service
+   * @param port - the rpc port.
+   */
+  public RpcDetailedActivityMBean(final MetricsRegistry mr,
+      final String serviceName, final String port) {
+    super(mr, "Rpc layer detailed statistics");
+    mbeanName = MBeanUtil.registerMBean(serviceName,
+        "RpcDetailedActivityForPort" + port, this);
+  }
+
+  public void shutdown() {
+    if (mbeanName != null)
+      MBeanUtil.unregisterMBean(mbeanName);
+  }
+}
\ No newline at end of file

Added: 
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java?rev=917737&view=auto
==============================================================================
--- 
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java
 (added)
+++ 
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java
 Mon Mar  1 21:36:23 2010
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc.metrics;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.Updater;
+import org.apache.hadoop.metrics.util.MetricsBase;
+import org.apache.hadoop.metrics.util.MetricsRegistry;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
+
+/**
+ * 
+ * This class is for maintaining  the various RPC method related statistics
+ * and publishing them through the metrics interfaces.
+ * This also registers the JMX MBean for RPC.
+ */
+public class RpcDetailedMetrics implements Updater {
+  public final MetricsRegistry registry = new MetricsRegistry();
+  private final MetricsRecord metricsRecord;
+  private static final Log LOG = LogFactory.getLog(RpcDetailedMetrics.class);
+  RpcDetailedActivityMBean rpcMBean;
+  
+  /**
+   * Statically added metrics to expose at least one metrics, without
+   * which other dynamically added metrics are not exposed over JMX.
+   */
+  final MetricsTimeVaryingRate getProtocolVersion = 
+    new MetricsTimeVaryingRate("getProtocolVersion", registry);
+  
+  public RpcDetailedMetrics(final String hostName, final String port) {
+    MetricsContext context = MetricsUtil.getContext("rpc");
+    metricsRecord = MetricsUtil.createRecord(context, "detailed-metrics");
+
+    metricsRecord.setTag("port", port);
+
+    LOG.info("Initializing RPC Metrics with hostName=" 
+        + hostName + ", port=" + port);
+
+    context.registerUpdater(this);
+    
+    // Need to clean up the interface to RpcMgt - don't need both metrics and 
server params
+    rpcMBean = new RpcDetailedActivityMBean(registry, hostName, port);
+  }
+  
+  
+  /**
+   * Push the metrics to the monitoring subsystem on doUpdate() call.
+   */
+  public void doUpdates(final MetricsContext context) {
+    
+    synchronized (this) {
+      for (MetricsBase m : registry.getMetricsList()) {
+        m.pushMetric(metricsRecord);
+      }
+    }
+    metricsRecord.update();
+  }
+
+  public void shutdown() {
+    if (rpcMBean != null) 
+      rpcMBean.shutdown();
+  }
+}

Modified: 
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java?rev=917737&r1=917736&r2=917737&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java 
(original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java 
Mon Mar  1 21:36:23 2010
@@ -28,6 +28,7 @@
 import org.apache.hadoop.metrics.util.MetricsIntValue;
 import org.apache.hadoop.metrics.util.MetricsRegistry;
 import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingLong;
 import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
 
 /**
@@ -43,13 +44,14 @@
  *
  */
 public class RpcMetrics implements Updater {
-  public MetricsRegistry registry = new MetricsRegistry();
-  private MetricsRecord metricsRecord;
-  private Server myServer;
-  private static Log LOG = LogFactory.getLog(RpcMetrics.class);
+  private final MetricsRegistry registry = new MetricsRegistry();
+  private final MetricsRecord metricsRecord;
+  private final Server myServer;
+  private static final Log LOG = LogFactory.getLog(RpcMetrics.class);
   RpcActivityMBean rpcMBean;
   
-  public RpcMetrics(String hostName, String port, Server server) {
+  public RpcMetrics(final String hostName, final String port,
+      final Server server) {
     myServer = server;
     MetricsContext context = MetricsUtil.getContext("rpc");
     metricsRecord = MetricsUtil.createRecord(context, "metrics");
@@ -72,26 +74,30 @@
    *  -they can also be read directly - e.g. JMX does this.
    */
 
-  public MetricsTimeVaryingRate rpcQueueTime =
+  public final MetricsTimeVaryingLong receivedBytes = 
+         new MetricsTimeVaryingLong("ReceivedBytes", registry);
+  public final MetricsTimeVaryingLong sentBytes = 
+         new MetricsTimeVaryingLong("SentBytes", registry);
+  public final MetricsTimeVaryingRate rpcQueueTime =
           new MetricsTimeVaryingRate("RpcQueueTime", registry);
   public MetricsTimeVaryingRate rpcProcessingTime =
           new MetricsTimeVaryingRate("RpcProcessingTime", registry);
-  public MetricsIntValue numOpenConnections = 
+  public final MetricsIntValue numOpenConnections = 
           new MetricsIntValue("NumOpenConnections", registry);
-  public MetricsIntValue callQueueLen = 
+  public final MetricsIntValue callQueueLen = 
           new MetricsIntValue("callQueueLen", registry);
-  public MetricsTimeVaryingInt authenticationFailures = 
+  public final MetricsTimeVaryingInt authenticationFailures = 
           new MetricsTimeVaryingInt("rpcAuthenticationFailures", registry);
-  public MetricsTimeVaryingInt authenticationSuccesses = 
+  public final MetricsTimeVaryingInt authenticationSuccesses = 
           new MetricsTimeVaryingInt("rpcAuthenticationSuccesses", registry);
-  public MetricsTimeVaryingInt authorizationFailures = 
+  public final MetricsTimeVaryingInt authorizationFailures = 
           new MetricsTimeVaryingInt("rpcAuthorizationFailures", registry);
-  public MetricsTimeVaryingInt authorizationSuccesses = 
+  public final MetricsTimeVaryingInt authorizationSuccesses = 
          new MetricsTimeVaryingInt("rpcAuthorizationSuccesses", registry);
   /**
    * Push the metrics to the monitoring subsystem on doUpdate() call.
    */
-  public void doUpdates(MetricsContext context) {
+  public void doUpdates(final MetricsContext context) {
     
     synchronized (this) {
       // ToFix - fix server to use the following two metrics directly so

Modified: hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java?rev=917737&r1=917736&r2=917737&view=diff
==============================================================================
--- hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java 
(original)
+++ hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java Mon 
Mar  1 21:36:23 2010
@@ -34,6 +34,9 @@
 import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.io.Writable;
 
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.spi.NullContext;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.authorize.PolicyProvider;
@@ -251,7 +254,26 @@
 
     stringResult = proxy.echo((String)null);
     assertEquals(stringResult, null);
-
+    
+    // Check rpcMetrics 
+    server.rpcMetrics.doUpdates(new NullContext());
+    
+    // Number 4 includes getProtocolVersion()
+    assertEquals(4, 
server.rpcMetrics.rpcProcessingTime.getPreviousIntervalNumOps());
+    assertTrue(server.rpcMetrics.sentBytes.getPreviousIntervalValue() > 0);
+    assertTrue(server.rpcMetrics.receivedBytes.getPreviousIntervalValue() > 0);
+    
+    // Number of calls to echo method should be 2
+    server.rpcDetailedMetrics.doUpdates(new NullContext());
+    MetricsTimeVaryingRate metrics = 
+      (MetricsTimeVaryingRate)server.rpcDetailedMetrics.registry.get("echo");
+    assertEquals(2, metrics.getPreviousIntervalNumOps());
+    
+    // Number of calls to ping method should be 1
+    metrics = 
+      (MetricsTimeVaryingRate)server.rpcDetailedMetrics.registry.get("ping");
+    assertEquals(1, metrics.getPreviousIntervalNumOps());
+    
     String[] stringResults = proxy.echo(new String[]{"foo","bar"});
     assertTrue(Arrays.equals(stringResults, new String[]{"foo","bar"}));
 


Reply via email to