Author: llu
Date: Tue Jul 12 10:22:31 2011
New Revision: 1145523

URL: http://svn.apache.org/viewvc?rev=1145523&view=rev
Log:
HADOOP-7324. Ganglia support for metrics v2. (Priyo Mustafi via llu)

Added:
    
hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/
    
hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/AbstractGangliaSink.java
    
hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaConf.java
    
hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaMetricVisitor.java
    
hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaSink30.java
    
hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaSink31.java
    
hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/metrics2/impl/TestGangliaMetrics.java
    
hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/metrics2/sink/
    
hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/metrics2/sink/ganglia/
    
hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/metrics2/sink/ganglia/GangliaMetricsTestHelper.java
Modified:
    hadoop/common/branches/branch-0.20-security-204/CHANGES.txt
    
hadoop/common/branches/branch-0.20-security-204/conf/hadoop-metrics2.properties
    
hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/util/MetricsCache.java
    
hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/metrics2/util/TestMetricsCache.java

Modified: hadoop/common/branches/branch-0.20-security-204/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/CHANGES.txt?rev=1145523&r1=1145522&r2=1145523&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-204/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security-204/CHANGES.txt Tue Jul 12 
10:22:31 2011
@@ -9,6 +9,8 @@ Release 0.20.204.0 - unreleased
     scripts for easy one node cluster configuration and user creation.
     (Eric Yang via omalley)
 
+    HADOOP-7324. Ganglia plugins for metrics v2. (Priyo Mustafi via llu)
+
   BUG FIXES
 
     MAPREDUCE-2495. exit() the TaskTracker when the distributed cache cleanup

Modified: 
hadoop/common/branches/branch-0.20-security-204/conf/hadoop-metrics2.properties
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/conf/hadoop-metrics2.properties?rev=1145523&r1=1145522&r2=1145523&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.20-security-204/conf/hadoop-metrics2.properties 
(original)
+++ 
hadoop/common/branches/branch-0.20-security-204/conf/hadoop-metrics2.properties 
Tue Jul 12 10:22:31 2011
@@ -14,3 +14,33 @@
 #maptask.sink.file.filename=maptask-metrics.out
 
 #reducetask.sink.file.filename=reducetask-metrics.out
+
+
+#
+# Below are for sending metrics to Ganglia
+#
+# for Ganglia 3.0 support
+# *.sink.ganglia.class=org.apache.hadoop.metrics2.sink.ganglia.GangliaSink30
+#
+# for Ganglia 3.1 support
+# *.sink.ganglia.class=org.apache.hadoop.metrics2.sink.ganglia.GangliaSink31
+
+# *.sink.ganglia.period=10
+
+# default for supportsparse is false
+# *.sink.ganglia.supportsparse=true
+
+#*.sink.ganglia.slope=jvm.metrics.gcCount=zero,jvm.metrics.memHeapUsedM=both
+#*.sink.ganglia.dmax=jvm.metrics.threadsBlocked=70,jvm.metrics.memHeapUsedM=40
+
+#namenode.sink.ganglia.servers=yourgangliahost_1:8649,yourgangliahost_2:8649
+
+#datanode.sink.ganglia.servers=yourgangliahost_1:8649,yourgangliahost_2:8649
+
+#jobtracker.sink.ganglia.servers=yourgangliahost_1:8649,yourgangliahost_2:8649
+
+#tasktracker.sink.ganglia.servers=yourgangliahost_1:8649,yourgangliahost_2:8649
+
+#maptask.sink.ganglia.servers=yourgangliahost_1:8649,yourgangliahost_2:8649
+
+#reducetask.sink.ganglia.servers=yourgangliahost_1:8649,yourgangliahost_2:8649

Added: 
hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/AbstractGangliaSink.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/AbstractGangliaSink.java?rev=1145523&view=auto
==============================================================================
--- 
hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/AbstractGangliaSink.java
 (added)
+++ 
hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/AbstractGangliaSink.java
 Tue Jul 12 10:22:31 2011
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.ganglia;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.configuration.SubsetConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.MetricsSink;
+import org.apache.hadoop.metrics2.util.Servers;
+import org.apache.hadoop.net.DNS;
+
+/**
+ * This the base class for Ganglia sink classes using metrics2. Lot of the code
+ * has been derived from org.apache.hadoop.metrics.ganglia.GangliaContext.
+ * As per the documentation, sink implementations doesn't have to worry about
+ * thread safety. Hence the code wasn't written for thread safety and should
+ * be modified in case the above assumption changes in the future.
+ */
+public abstract class AbstractGangliaSink implements MetricsSink {
+
+  public final Log LOG = LogFactory.getLog(this.getClass());
+
+  /*
+   * Output of "gmetric --help" showing allowable values
+   * -t, --type=STRING
+   *     Either string|int8|uint8|int16|uint16|int32|uint32|float|double
+   * -u, --units=STRING Unit of measure for the value e.g. Kilobytes, Celcius
+   *     (default='')
+   * -s, --slope=STRING Either zero|positive|negative|both
+   *     (default='both')
+   * -x, --tmax=INT The maximum time in seconds between gmetric calls
+   *     (default='60')
+   */
+  public static final String DEFAULT_UNITS = "";
+  public static final int DEFAULT_TMAX = 60;
+  public static final int DEFAULT_DMAX = 0;
+  public static final GangliaSlope DEFAULT_SLOPE = GangliaSlope.both;
+  public static final int DEFAULT_PORT = 8649;
+  public static final String SERVERS_PROPERTY = "servers";
+  public static final int BUFFER_SIZE = 1500; // as per libgmond.c
+  public static final String SUPPORT_SPARSE_METRICS_PROPERTY = "supportsparse";
+  public static final boolean SUPPORT_SPARSE_METRICS_DEFAULT = false;
+  public static final String EQUAL = "=";
+
+  private String hostName = "UNKNOWN.example.com";
+  private DatagramSocket datagramSocket;
+  private List<? extends SocketAddress> metricsServers;
+  private byte[] buffer = new byte[BUFFER_SIZE];
+  private int offset;
+  private boolean supportSparseMetrics = SUPPORT_SPARSE_METRICS_DEFAULT;
+
+  /**
+   * Used for visiting Metrics
+   */
+  protected final GangliaMetricVisitor gangliaMetricVisitor =
+    new GangliaMetricVisitor();
+
+  private SubsetConfiguration conf;
+  private Map<String, GangliaConf> gangliaConfMap;
+  private GangliaConf DEFAULT_GANGLIA_CONF = new GangliaConf();
+
+  /**
+   * ganglia slope values which equal the ordinal
+   */
+  public enum GangliaSlope {
+    zero,       // 0
+    positive,   // 1
+    negative,   // 2
+    both        // 3
+  };
+
+  /**
+   * define enum for various type of conf
+   */
+  public enum GangliaConfType {
+    slope, units, dmax, tmax
+  };
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * 
org.apache.hadoop.metrics2.MetricsPlugin#init(org.apache.commons.configuration
+   * .SubsetConfiguration)
+   */
+  public void init(SubsetConfiguration conf) {
+    LOG.debug("Initializing the GangliaSink for Ganglia metrics.");
+
+    this.conf = conf;
+
+    // Take the hostname from the DNS class.
+    if (conf.getString("slave.host.name") != null) {
+      hostName = conf.getString("slave.host.name");
+    } else {
+      try {
+        hostName = DNS.getDefaultHost(
+            conf.getString("dfs.datanode.dns.interface", "default"),
+            conf.getString("dfs.datanode.dns.nameserver", "default"));
+      } catch (UnknownHostException uhe) {
+        LOG.error(uhe);
+        hostName = "UNKNOWN.example.com";
+      }
+    }
+
+    // load the gannglia servers from properties
+    metricsServers = Servers.parse(conf.getString(SERVERS_PROPERTY),
+        DEFAULT_PORT);
+
+    // extract the Ganglia conf per metrics
+    gangliaConfMap = new HashMap<String, GangliaConf>();
+    loadGangliaConf(GangliaConfType.units);
+    loadGangliaConf(GangliaConfType.tmax);
+    loadGangliaConf(GangliaConfType.dmax);
+    loadGangliaConf(GangliaConfType.slope);
+
+    try {
+      datagramSocket = new DatagramSocket();
+    } catch (SocketException se) {
+      LOG.error(se);
+    }
+
+    // see if sparseMetrics is supported. Default is false
+    supportSparseMetrics = conf.getBoolean(SUPPORT_SPARSE_METRICS_PROPERTY,
+        SUPPORT_SPARSE_METRICS_DEFAULT);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see org.apache.hadoop.metrics2.MetricsSink#flush()
+   */
+  public void flush() {
+    // nothing to do as we are not buffering data
+  }
+
+  /**
+   * Load the configurations for a conf type
+   *
+   * @param gtype Only load metrics for given type
+   */
+  private void loadGangliaConf(GangliaConfType gtype) {
+    String propertyarr[] = conf.getStringArray(gtype.name());
+    if (propertyarr != null && propertyarr.length > 0) {
+      for (String metricNValue : propertyarr) {
+        String metricNValueArr[] = metricNValue.split(EQUAL);
+        if (metricNValueArr.length != 2 || metricNValueArr[0].length() == 0) {
+          LOG.error("Invalid propertylist for " + gtype.name());
+        }
+
+        String metricName = metricNValueArr[0].trim();
+        String metricValue = metricNValueArr[1].trim();
+        GangliaConf gconf = gangliaConfMap.get(metricName);
+        if (gconf == null) {
+          gconf = new GangliaConf();
+          gangliaConfMap.put(metricName, gconf);
+        }
+
+        switch (gtype) {
+        case units:
+          gconf.setUnits(metricValue);
+          break;
+        case dmax:
+          gconf.setDmax(Integer.parseInt(metricValue));
+          break;
+        case tmax:
+          gconf.setTmax(Integer.parseInt(metricValue));
+          break;
+        case slope:
+          gconf.setSlope(GangliaSlope.valueOf(metricValue));
+          break;
+        }
+      }
+    }
+  }
+
+  /**
+   * Lookup GangliaConf from cache. If not found, return default values
+   *
+   * @param metricName
+   * @return looked up GangliaConf
+   */
+  protected GangliaConf getGangliaConfForMetric(String metricName) {
+    GangliaConf gconf = gangliaConfMap.get(metricName);
+
+    return gconf != null ? gconf : DEFAULT_GANGLIA_CONF;
+  }
+
+  /**
+   * @return the hostName
+   */
+  protected String getHostName() {
+    return hostName;
+  }
+
+  /**
+   * Puts a string into the buffer by first writing the size of the string as 
an
+   * int, followed by the bytes of the string, padded if necessary to a 
multiple
+   * of 4.
+   * @param s the string to be written to buffer at offset location
+   */
+  protected void xdr_string(String s) {
+    byte[] bytes = s.getBytes();
+    int len = bytes.length;
+    xdr_int(len);
+    System.arraycopy(bytes, 0, buffer, offset, len);
+    offset += len;
+    pad();
+  }
+
+  /**
+   * Pads the buffer with zero bytes up to the nearest multiple of 4.
+   */
+  private void pad() {
+    int newOffset = ((offset + 3) / 4) * 4;
+    while (offset < newOffset) {
+      buffer[offset++] = 0;
+    }
+  }
+
+  /**
+   * Puts an integer into the buffer as 4 bytes, big-endian.
+   */
+  protected void xdr_int(int i) {
+    buffer[offset++] = (byte) ((i >> 24) & 0xff);
+    buffer[offset++] = (byte) ((i >> 16) & 0xff);
+    buffer[offset++] = (byte) ((i >> 8) & 0xff);
+    buffer[offset++] = (byte) (i & 0xff);
+  }
+
+  /**
+   * Sends Ganglia Metrics to the configured hosts
+   * @throws IOException
+   */
+  protected void emitToGangliaHosts() throws IOException {
+    try {
+      for (SocketAddress socketAddress : metricsServers) {
+        DatagramPacket packet =
+          new DatagramPacket(buffer, offset, socketAddress);
+        datagramSocket.send(packet);
+      }
+    } finally {
+      // reset the buffer for the next metric to be built
+      offset = 0;
+    }
+  }
+
+  /**
+   * @return whether sparse metrics are supported
+   */
+  protected boolean isSupportSparseMetrics() {
+    return supportSparseMetrics;
+  }
+
+  /**
+   * Used only by unit test
+   * @param datagramSocket the datagramSocket to set.
+   */
+  void setDatagramSocket(DatagramSocket datagramSocket) {
+    this.datagramSocket = datagramSocket;
+  }
+}

Added: 
hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaConf.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaConf.java?rev=1145523&view=auto
==============================================================================
--- 
hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaConf.java
 (added)
+++ 
hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaConf.java
 Tue Jul 12 10:22:31 2011
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.ganglia;
+
+import 
org.apache.hadoop.metrics2.sink.ganglia.AbstractGangliaSink.GangliaSlope;
+
+/**
+ * class which is used to store ganglia properties
+ */
+class GangliaConf {
+  private String units = AbstractGangliaSink.DEFAULT_UNITS;
+  private GangliaSlope slope;
+  private int dmax = AbstractGangliaSink.DEFAULT_DMAX;
+  private int tmax = AbstractGangliaSink.DEFAULT_TMAX;
+
+  @Override
+  public String toString() {
+    StringBuilder buf = new StringBuilder();
+    buf.append("unit=").append(units).append(", slope=").append(slope)
+        .append(", dmax=").append(dmax).append(", tmax=").append(tmax);
+    return buf.toString();
+  }
+
+  /**
+   * @return the units
+   */
+  String getUnits() {
+    return units;
+  }
+
+  /**
+   * @param units the units to set
+   */
+  void setUnits(String units) {
+    this.units = units;
+  }
+
+  /**
+   * @return the slope
+   */
+  GangliaSlope getSlope() {
+    return slope;
+  }
+
+  /**
+   * @param slope the slope to set
+   */
+  void setSlope(GangliaSlope slope) {
+    this.slope = slope;
+  }
+
+  /**
+   * @return the dmax
+   */
+  int getDmax() {
+    return dmax;
+  }
+
+  /**
+   * @param dmax the dmax to set
+   */
+  void setDmax(int dmax) {
+    this.dmax = dmax;
+  }
+
+  /**
+   * @return the tmax
+   */
+  int getTmax() {
+    return tmax;
+  }
+
+  /**
+   * @param tmax the tmax to set
+   */
+  void setTmax(int tmax) {
+    this.tmax = tmax;
+  }
+}
\ No newline at end of file

Added: 
hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaMetricVisitor.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaMetricVisitor.java?rev=1145523&view=auto
==============================================================================
--- 
hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaMetricVisitor.java
 (added)
+++ 
hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaMetricVisitor.java
 Tue Jul 12 10:22:31 2011
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.ganglia;
+
+import org.apache.hadoop.metrics2.MetricCounter;
+import org.apache.hadoop.metrics2.MetricGauge;
+import org.apache.hadoop.metrics2.MetricsVisitor;
+import 
org.apache.hadoop.metrics2.sink.ganglia.AbstractGangliaSink.GangliaSlope;
+
+/**
+ * Since implementations of Metric are not public, hence use a visitor to
+ * figure out the type and slope of the metric. Counters have "positive"
+ * slope.
+ */
+class GangliaMetricVisitor implements MetricsVisitor {
+  private static final String INT32 = "int32";
+  private static final String FLOAT = "float";
+  private static final String DOUBLE = "double";
+
+  private String type;
+  private GangliaSlope slope;
+
+  /**
+   * @return the type of a visited metric
+   */
+  String getType() {
+    return type;
+  }
+
+  /**
+   * @return the slope of a visited metric. Slope is positive for counters
+   * and null for others
+   */
+  GangliaSlope getSlope() {
+    return slope;
+  }
+
+  @Override
+  public void gauge(MetricGauge<Integer> metric, int value) {
+    // MetricGaugeInt.class ==> "int32"
+    type = INT32;
+    slope = null; // set to null as cannot figure out from Metric
+  }
+
+  @Override
+  public void counter(MetricCounter<Integer> metric, int value) {
+    // MetricCounterInt.class ==> "int32"
+    type = INT32;
+
+    // counters have positive slope
+    slope = GangliaSlope.positive;
+  }
+
+  @Override
+  public void gauge(MetricGauge<Long> metric, long value) {
+    // MetricGaugeLong.class ==> "float"
+    type = FLOAT;
+    slope = null; // set to null as cannot figure out from Metric
+  }
+
+  @Override
+  public void counter(MetricCounter<Long> metric, long value) {
+    // MetricCounterLong.class ==> "float"
+    type = FLOAT;
+
+    // counters have positive slope
+    slope = GangliaSlope.positive;
+  }
+
+  @Override
+  public void gauge(MetricGauge<Float> metric, float value) {
+    // MetricGaugeFloat.class ==> "float"
+    type = FLOAT;
+    slope = null; // set to null as cannot figure out from Metric
+  }
+
+  @Override
+  public void gauge(MetricGauge<Double> metric, double value) {
+    // MetricGaugeDouble.class ==> "double"
+    type = DOUBLE;
+    slope = null; // set to null as cannot figure out from Metric
+  }
+}

Added: 
hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaSink30.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaSink30.java?rev=1145523&view=auto
==============================================================================
--- 
hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaSink30.java
 (added)
+++ 
hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaSink30.java
 Tue Jul 12 10:22:31 2011
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.ganglia;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.Metric;
+import org.apache.hadoop.metrics2.MetricsException;
+import org.apache.hadoop.metrics2.MetricsRecord;
+import org.apache.hadoop.metrics2.util.MetricsCache;
+import org.apache.hadoop.metrics2.util.MetricsCache.Record;
+
+/**
+ * This code supports Ganglia 3.0
+ *
+ */
+public class GangliaSink30 extends AbstractGangliaSink {
+
+  public final Log LOG = LogFactory.getLog(this.getClass());
+
+  protected MetricsCache metricsCache = new MetricsCache();
+
+  /*
+   *
+   * (non-Javadoc)
+   *
+   * @see
+   * 
org.apache.hadoop.metrics2.MetricsSink#putMetrics(org.apache.hadoop.metrics2
+   * .MetricsRecord)
+   */
+  @Override
+  public void putMetrics(MetricsRecord record) {
+    // The method handles both cases whether Ganglia support dense publish of
+    // metrics of sparse (only on change) publish of metrics
+    try {
+      String recordName = record.name();
+      String contextName = record.context();
+
+      StringBuilder sb = new StringBuilder();
+      sb.append(contextName);
+      sb.append('.');
+      sb.append(recordName);
+
+      String groupName = sb.toString();
+      sb.append('.');
+      int sbBaseLen = sb.length();
+
+      String type = null;
+      GangliaSlope slopeFromMetric = null;
+      GangliaSlope calculatedSlope = null;
+      Record cachedMetrics = null;
+      if (!isSupportSparseMetrics()) {
+        // for sending dense metrics, update metrics cache
+        // and get the updated data
+        cachedMetrics = metricsCache.update(record);
+
+        if (cachedMetrics != null && cachedMetrics.metricsEntrySet() != null) {
+          for (Map.Entry<String, Metric> entry : 
cachedMetrics.metricsEntrySet()) {
+            Metric metric = entry.getValue();
+            sb.append(metric.name());
+            String name = sb.toString();
+
+            // visit the metric to identify the Ganglia type and slope
+            metric.visit(gangliaMetricVisitor);
+            type = gangliaMetricVisitor.getType();
+            slopeFromMetric = gangliaMetricVisitor.getSlope();
+
+
+            GangliaConf gConf = getGangliaConfForMetric(name);
+            calculatedSlope = calculateSlope(gConf, slopeFromMetric);
+
+            // send metric to Ganglia
+            emitMetric(groupName, name, type, metric.value().toString(),
+                gConf, calculatedSlope);
+
+            // reset the length of the buffer for next iteration
+            sb.setLength(sbBaseLen);
+          }
+        }
+      } else {
+        // we support sparse updates
+
+        Collection<Metric> metrics = (Collection<Metric>) record.metrics();
+        if (metrics.size() > 0) {
+          // we got metrics. so send the latest
+          for (Metric metric : record.metrics()) {
+            sb.append(metric.name());
+            String name = sb.toString();
+
+            // visit the metric to identify the Ganglia type and slope
+            metric.visit(gangliaMetricVisitor);
+            type = gangliaMetricVisitor.getType();
+            slopeFromMetric = gangliaMetricVisitor.getSlope();
+
+
+            GangliaConf gConf = getGangliaConfForMetric(name);
+            calculatedSlope = calculateSlope(gConf, slopeFromMetric);
+
+            // send metric to Ganglia
+            emitMetric(groupName, name, type, metric.value().toString(),
+                gConf, calculatedSlope);
+
+            // reset the length of the buffer for next iteration
+            sb.setLength(sbBaseLen);
+          }
+        }
+      }
+    } catch (IOException io) {
+      throw new MetricsException("Failed to putMetrics", io);
+    }
+  }
+
+
+  /**
+   * Calculate the slope from properties and metric
+   *
+   * @param gConf Pass
+   * @param slopeFromMetric
+   * @return
+   */
+  private GangliaSlope calculateSlope(GangliaConf gConf, GangliaSlope 
slopeFromMetric) {
+    if (gConf.getSlope() != null) {
+      // if slope has been specified in properties, use that
+      return gConf.getSlope();
+    } else if (slopeFromMetric != null) {
+      // slope not specified in properties, use derived from Metric
+      return slopeFromMetric;
+    } else {
+      return DEFAULT_SLOPE;
+    }
+  }
+
+  /**
+   * The method sends metrics to Ganglia servers. The method has been taken 
from
+   * org.apache.hadoop.metrics.ganglia.GangliaContext30 with minimal changes in
+   * order to keep it in sync.
+
+   * @param groupName The group name of the metric
+   * @param name The metric name
+   * @param type The type of the metric
+   * @param value The value of the metric
+   * @param gConf The GangliaConf for this metric
+   * @param gSlope The slope for this metric
+   * @throws IOException
+   */
+  protected void emitMetric(String groupName, String name, String type,
+      String value, GangliaConf gConf, GangliaSlope gSlope)
+    throws IOException {
+
+    if (name == null) {
+      LOG.warn("Metric was emitted with no name.");
+      return;
+    } else if (value == null) {
+      LOG.warn("Metric name " + name + " was emitted with a null value.");
+      return;
+    } else if (type == null) {
+      LOG.warn("Metric name " + name + ", value " + value + " has no type.");
+      return;
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Emitting metric " + name + ", type " + type + ", value " + 
value
+          + ", slope " + gSlope.name()+ " from hostname " + getHostName());
+    }
+
+    xdr_int(0); // metric_user_defined
+    xdr_string(type);
+    xdr_string(name);
+    xdr_string(value);
+    xdr_string(gConf.getUnits());
+    xdr_int(gSlope.ordinal());
+    xdr_int(gConf.getTmax());
+    xdr_int(gConf.getDmax());
+
+    // send the metric to Ganglia hosts
+    emitToGangliaHosts();
+  }
+}

Added: 
hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaSink31.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaSink31.java?rev=1145523&view=auto
==============================================================================
--- 
hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaSink31.java
 (added)
+++ 
hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaSink31.java
 Tue Jul 12 10:22:31 2011
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.ganglia;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * This code supports Ganglia 3.1
+ *
+ */
+public class GangliaSink31 extends GangliaSink30 {
+
+  public final Log LOG = LogFactory.getLog(this.getClass());
+
+  /**
+   * The method sends metrics to Ganglia servers. The method has been taken 
from
+   * org.apache.hadoop.metrics.ganglia.GangliaContext31 with minimal changes in
+   * order to keep it in sync.
+
+  * @param groupName The group name of the metric
+   * @param name The metric name
+   * @param type The type of the metric
+   * @param value The value of the metric
+   * @param gConf The GangliaConf for this metric
+   * @param gSlope The slope for this metric
+   * @throws IOException
+   */
+  protected void emitMetric(String groupName, String name, String type,
+      String value, GangliaConf gConf, GangliaSlope gSlope)
+    throws IOException {
+
+    if (name == null) {
+      LOG.warn("Metric was emitted with no name.");
+      return;
+    } else if (value == null) {
+      LOG.warn("Metric name " + name +" was emitted with a null value.");
+      return;
+    } else if (type == null) {
+      LOG.warn("Metric name " + name + ", value " + value + " has no type.");
+      return;
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Emitting metric " + name + ", type " + type + ", value " + 
value
+          + ", slope " + gSlope.name()+ " from hostname " + getHostName());
+    }
+
+    // The following XDR recipe was done through a careful reading of
+    // gm_protocol.x in Ganglia 3.1 and carefully examining the output of
+    // the gmetric utility with strace.
+
+    // First we send out a metadata message
+    xdr_int(128);               // metric_id = metadata_msg
+    xdr_string(getHostName());       // hostname
+    xdr_string(name);           // metric name
+    xdr_int(0);                 // spoof = False
+    xdr_string(type);           // metric type
+    xdr_string(name);           // metric name
+    xdr_string(gConf.getUnits());    // units
+    xdr_int(gSlope.ordinal());  // slope
+    xdr_int(gConf.getTmax());        // tmax, the maximum time between metrics
+    xdr_int(gConf.getDmax());        // dmax, the maximum data value
+    xdr_int(1);                 /*Num of the entries in extra_value field for
+                                  Ganglia 3.1.x*/
+    xdr_string("GROUP");        /*Group attribute*/
+    xdr_string(groupName);      /*Group value*/
+
+    // send the metric to Ganglia hosts
+    emitToGangliaHosts();
+
+    // Now we send out a message with the actual value.
+    // Technically, we only need to send out the metadata message once for
+    // each metric, but I don't want to have to record which metrics we did and
+    // did not send.
+    xdr_int(133);         // we are sending a string value
+    xdr_string(getHostName()); // hostName
+    xdr_string(name);     // metric name
+    xdr_int(0);           // spoof = False
+    xdr_string("%s");     // format field
+    xdr_string(value);    // metric value
+
+    // send the metric to Ganglia hosts
+    emitToGangliaHosts();
+  }
+}

Modified: 
hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/util/MetricsCache.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/util/MetricsCache.java?rev=1145523&r1=1145522&r2=1145523&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/util/MetricsCache.java
 (original)
+++ 
hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/util/MetricsCache.java
 Tue Jul 12 10:22:31 2011
@@ -45,7 +45,7 @@ public class MetricsCache {
    */
   public static class Record {
     final Map<String, String> tags = new LinkedHashMap<String, String>();
-    final Map<String, Number> metrics = new LinkedHashMap<String, Number>();
+    final Map<String, Metric> metrics = new LinkedHashMap<String, Metric>();
 
     /**
      * Get the tag value
@@ -62,6 +62,16 @@ public class MetricsCache {
      * @return the metric value
      */
     public Number getMetric(String key) {
+      Metric metric = metrics.get(key);
+      return metric != null ? metric.value() : null;
+    }
+
+    /**
+     * Get the metric value
+     * @param key name of the metric
+     * @return the metric value
+     */
+    public Metric getMetricInstance(String key) {
       return metrics.get(key);
     }
 
@@ -69,6 +79,18 @@ public class MetricsCache {
      * @return entry set of metrics
      */
     public Set<Map.Entry<String, Number>> metrics() {
+      Map<String, Number> map =
+        new LinkedHashMap<String,Number>(metrics.size());
+      for (Map.Entry<String, Metric> mapEntry : metrics.entrySet()) {
+        map.put(mapEntry.getKey(), mapEntry.getValue().value());
+      }
+      return map.entrySet();
+    }
+
+    /**
+     * @return entry set of metrics
+     */
+    public Set<Map.Entry<String, Metric>> metricsEntrySet() {
       return metrics.entrySet();
     }
   }
@@ -93,7 +115,7 @@ public class MetricsCache {
       recMap.put(tags, rec);
     }
     for (Metric m : mr.metrics()) {
-      rec.metrics.put(m.name(), m.value());
+      rec.metrics.put(m.name(), m);
     }
     if (includingTags) {
       // mostly for some sinks that include tags as part of a dense schema
@@ -119,5 +141,4 @@ public class MetricsCache {
     if (tmap == null) return null;
     return tmap.get(tags);
   }
-
 }

Added: 
hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/metrics2/impl/TestGangliaMetrics.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/metrics2/impl/TestGangliaMetrics.java?rev=1145523&view=auto
==============================================================================
--- 
hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/metrics2/impl/TestGangliaMetrics.java
 (added)
+++ 
hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/metrics2/impl/TestGangliaMetrics.java
 Tue Jul 12 10:22:31 2011
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.lib.AbstractMetricsSource;
+import org.apache.hadoop.metrics2.lib.MetricMutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MetricMutableGaugeLong;
+import org.apache.hadoop.metrics2.lib.MetricMutableStat;
+import org.apache.hadoop.metrics2.sink.ganglia.AbstractGangliaSink;
+import org.apache.hadoop.metrics2.sink.ganglia.GangliaSink30;
+import org.apache.hadoop.metrics2.sink.ganglia.GangliaSink31;
+import org.apache.hadoop.metrics2.sink.ganglia.GangliaMetricsTestHelper;
+import org.junit.Test;
+
+public class TestGangliaMetrics {
+  public static final Log LOG = LogFactory.getLog(TestMetricsSystemImpl.class);
+  private final String[] expectedMetrics =
+    {"test.s1rec.c1",
+     "test.s1rec.g1",
+     "test.s1rec.s1_num_ops",
+     "test.s1rec.s1_avg_time"};
+
+  @Test public void testGangliaMetrics2() throws Exception {
+    ConfigBuilder cb = new ConfigBuilder().add("default.period", 10)
+        .add("test.sink.gsink30.context", "test") // filter out only "test"
+        .add("test.sink.gsink31.context", "test") // filter out only "test"
+        .save(TestMetricsConfig.getTestFilename("hadoop-metrics2-test"));
+
+    MetricsSystemImpl ms = new MetricsSystemImpl("Test");
+    ms.start();
+    TestSource s1 = ms.register("s1", "s1 desc", new TestSource("s1rec"));
+    s1.s1.add(0);
+
+    AbstractGangliaSink gsink30 = new GangliaSink30();
+    gsink30.init(cb.subset("test"));
+    MockDatagramSocket mockds30 = new MockDatagramSocket();
+    GangliaMetricsTestHelper.setDatagramSocket(gsink30, mockds30);
+
+    AbstractGangliaSink gsink31 = new GangliaSink31();
+    gsink31.init(cb.subset("test"));
+    MockDatagramSocket mockds31 = new MockDatagramSocket();
+    GangliaMetricsTestHelper.setDatagramSocket(gsink31, mockds31);
+
+    ms.register("gsink30", "gsink30 desc", gsink30);
+    ms.register("gsink31", "gsink31 desc", gsink31);
+    ms.onTimerEvent();  // trigger something interesting
+    ms.stop();
+
+    // check GanfliaSink30 data
+    checkMetrics(mockds30.getCapturedSend());
+
+    // check GanfliaSink31 data
+    checkMetrics(mockds31.getCapturedSend());
+  }
+
+
+  private void checkMetrics(List<byte[]> bytearrlist) {
+    boolean[] foundMetrics = new boolean[expectedMetrics.length];
+    for (byte[] bytes : bytearrlist) {
+      String binaryStr = new String(bytes);
+      for (int index = 0; index < expectedMetrics.length; index++) {
+        if (binaryStr.indexOf(expectedMetrics[index]) >= 0) {
+          foundMetrics[index] = true;
+          break;
+        }
+      }
+    }
+
+    for (int index = 0; index < foundMetrics.length; index++) {
+      if (!foundMetrics[index]) {
+        assertTrue("Missing metrics: " + expectedMetrics[index], false);
+      }
+    }
+  }
+
+  @SuppressWarnings("unused")
+  private static class TestSource extends AbstractMetricsSource {
+    final MetricMutableCounterLong c1;
+    final MetricMutableGaugeLong g1;
+    final MetricMutableStat s1;
+
+    TestSource(String name) {
+      super(name);
+      registry.setContext("test");
+      c1 = registry.newCounter("c1", "c1 desc", 1L);
+      g1 = registry.newGauge("g1", "g1 desc", 2L);
+      s1 = registry.newStat("s1", "s1 desc", "ops", "time");
+    }
+  }
+
+  /**
+   * This class is used to capture data send to Ganglia servers.
+   *
+   * Initial attempt was to use mockito to mock and capture but
+   * while testing figured out that mockito is keeping the reference
+   * to the byte array and since the sink code reuses the byte array
+   * hence all the captured byte arrays were pointing to one instance.
+   */
+  private static class MockDatagramSocket extends DatagramSocket {
+    private ArrayList<byte[]> capture;
+
+    public MockDatagramSocket() throws SocketException {
+      capture = new  ArrayList<byte[]>();
+    }
+    /* (non-Javadoc)
+     * @see java.net.DatagramSocket#send(java.net.DatagramPacket)
+     */
+    @Override
+    public void send(DatagramPacket p) throws IOException {
+      byte[] bytes = new byte[p.getLength()];
+      System.arraycopy(p.getData(), p.getOffset(), bytes, 0, p.getLength());
+      capture.add(bytes);
+    }
+
+    ArrayList<byte[]> getCapturedSend() {
+      return capture;
+    }
+  }
+}

Added: 
hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/metrics2/sink/ganglia/GangliaMetricsTestHelper.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/metrics2/sink/ganglia/GangliaMetricsTestHelper.java?rev=1145523&view=auto
==============================================================================
--- 
hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/metrics2/sink/ganglia/GangliaMetricsTestHelper.java
 (added)
+++ 
hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/metrics2/sink/ganglia/GangliaMetricsTestHelper.java
 Tue Jul 12 10:22:31 2011
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.ganglia;
+
+import java.net.DatagramSocket;
+
+/**
+ * Helper class in the same package as ganglia sinks to be used by unit tests
+ */
+public class GangliaMetricsTestHelper {
+
+  /**
+   * Helper method to access package private method to set DatagramSocket
+   * needed for Unit test
+   * @param gangliaSink
+   * @param datagramSocket
+   */
+  public static void setDatagramSocket(AbstractGangliaSink gangliaSink,
+      DatagramSocket datagramSocket) {
+
+    gangliaSink.setDatagramSocket(datagramSocket);
+  }
+}

Modified: 
hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/metrics2/util/TestMetricsCache.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/metrics2/util/TestMetricsCache.java?rev=1145523&r1=1145522&r2=1145523&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/metrics2/util/TestMetricsCache.java
 (original)
+++ 
hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/metrics2/util/TestMetricsCache.java
 Tue Jul 12 10:22:31 2011
@@ -47,6 +47,9 @@ public class TestMetricsCache {
     assertEquals("same record size", cr.metrics.size(),
                  ((Collection<Metric>)mr.metrics()).size());
     assertEquals("same metric value", 0, cr.getMetric("m"));
+    assertNotNull("metric not null", cr.getMetricInstance("m"));
+    assertEquals("new metric value", 0, cr.getMetricInstance("m").value());
+
 
     MetricsRecord mr2 = makeRecord("r",
         Arrays.asList(makeTag("t", "tv")),
@@ -54,8 +57,16 @@ public class TestMetricsCache {
     cr = cache.update(mr2);
     assertEquals("contains 3 metric", 3, cr.metrics.size());
     assertEquals("updated metric value", 2, cr.getMetric("m"));
+    assertNotNull("metric not null", cr.getMetricInstance("m"));
+    assertEquals("new metric value", 2, cr.getMetricInstance("m").value());
+
     assertEquals("old metric value", 1, cr.getMetric("m1"));
+    assertNotNull("metric not null", cr.getMetricInstance("m1"));
+    assertEquals("new metric value", 1, cr.getMetricInstance("m1").value());
+
     assertEquals("new metric value", 42, cr.getMetric("m2"));
+    assertNotNull("metric not null", cr.getMetricInstance("m2"));
+    assertEquals("new metric value", 42, cr.getMetricInstance("m2").value());
 
     MetricsRecord mr3 = makeRecord("r",
         Arrays.asList(makeTag("t", "tv3")), // different tag value
@@ -63,6 +74,9 @@ public class TestMetricsCache {
     cr = cache.update(mr3); // should get a new record
     assertEquals("contains 1 metric", 1, cr.metrics.size());
     assertEquals("updated metric value", 3, cr.getMetric("m3"));
+    assertNotNull("metric not null", cr.getMetricInstance("m3"));
+    assertEquals("new metric value", 3, cr.getMetricInstance("m3").value());
+
     // tags cache should be empty so far
     assertEquals("no tags", 0, cr.tags.size());
     // until now
@@ -70,6 +84,8 @@ public class TestMetricsCache {
     assertEquals("Got 1 tag", 1, cr.tags.size());
     assertEquals("Tag value", "tv3", cr.getTag("t"));
     assertEquals("Metric value", 3, cr.getMetric("m3"));
+    assertNotNull("metric not null", cr.getMetricInstance("m3"));
+    assertEquals("new metric value", 3, cr.getMetricInstance("m3").value());
   }
 
   @Test public void testGet() {
@@ -85,6 +101,8 @@ public class TestMetricsCache {
     assertNotNull("Got record", cr);
     assertEquals("contains 1 metric", 1, cr.metrics.size());
     assertEquals("new metric value", 1, cr.getMetric("m"));
+    assertNotNull("metric not null", cr.getMetricInstance("m"));
+    assertEquals("new metric value", 1, cr.getMetricInstance("m").value());
   }
 
   private MetricsRecord makeRecord(String name, Collection<MetricsTag> tags,


Reply via email to