Updated Branches:
  refs/heads/trunk 02fb21d93 -> 8ed06fe5f

GIRAPH-505: Metrics Updates (nitay)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/8ed06fe5
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/8ed06fe5
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/8ed06fe5

Branch: refs/heads/trunk
Commit: 8ed06fe5f736d135ca4a7fb208a20f5844c15781
Parents: 02fb21d
Author: Nitay Joffe <[email protected]>
Authored: Wed Feb 6 14:30:03 2013 -0500
Committer: Nitay Joffe <[email protected]>
Committed: Wed Feb 6 19:13:43 2013 -0500

----------------------------------------------------------------------
 CHANGELOG                                          |    2 +
 .../org/apache/giraph/comm/netty/ByteCounter.java  |   16 ++--
 .../giraph/comm/netty/NettyWorkerClient.java       |   48 ++++++++-
 .../netty/NettyWorkerClientRequestProcessor.java   |   37 ++++++-
 .../requests/SendPartitionMutationsRequest.java    |    6 +
 .../org/apache/giraph/graph/ComputeCallable.java   |   47 +++++----
 .../org/apache/giraph/graph/GraphTaskManager.java  |    6 +-
 .../org/apache/giraph/master/BspServiceMaster.java |    2 +-
 .../apache/giraph/metrics/AggregatedMetrics.java   |   10 --
 .../org/apache/giraph/metrics/GiraphMetrics.java   |    7 --
 .../giraph/metrics/GiraphMetricsRegistry.java      |   51 +++++++---
 .../java/org/apache/giraph/metrics/MeterDesc.java  |   78 ++++++++++++++
 .../org/apache/giraph/metrics/MetricNames.java     |   81 +++++++++++++++
 .../giraph/metrics/SuperstepMetricsRegistry.java   |   15 ++--
 .../java/org/apache/giraph/metrics/TimerDesc.java  |   72 +++++++++++++
 .../giraph/metrics/WorkerSuperstepMetrics.java     |    6 +-
 .../java/org/apache/giraph/utils/MemoryUtils.java  |   11 +-
 .../org/apache/giraph/worker/BspServiceWorker.java |    2 +-
 .../giraph/worker/EdgeInputSplitsCallable.java     |   28 +++---
 .../apache/giraph/worker/InputSplitsCallable.java  |   26 ++++-
 .../giraph/worker/VertexInputSplitsCallable.java   |   63 +++++------
 21 files changed, 471 insertions(+), 143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 05bcf91..c6e2b75 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-505: Metrics Updates (nitay)
+
   GIRAPH-506: Concurrency issue - response can arrive before request is added 
to the outstanding map (majakabiljo)
 
   GIRAPH-501: WorkerObserver (nitay)

http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/comm/netty/ByteCounter.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/ByteCounter.java 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/ByteCounter.java
index 825e383..6bba199 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/ByteCounter.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/ByteCounter.java
@@ -19,6 +19,8 @@
 package org.apache.giraph.comm.netty;
 
 import org.apache.giraph.metrics.GiraphMetrics;
+import org.apache.giraph.metrics.MeterDesc;
+import org.apache.giraph.metrics.MetricNames;
 import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
 import org.apache.giraph.time.SystemTime;
@@ -36,7 +38,6 @@ import com.yammer.metrics.core.NoOpHistogram;
 import com.yammer.metrics.core.NoOpMeter;
 
 import java.text.DecimalFormat;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -86,12 +87,13 @@ public class ByteCounter extends SimpleChannelHandler 
implements
 
   @Override
   public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
-    sentRequestsMeter = superstepMetrics.getMeter("sent-requests", "requests",
-        TimeUnit.SECONDS);
-    sentBytesHist = superstepMetrics.getHistogram("sent-bytes", false);
-    receivedRequestsMeter = superstepMetrics.getMeter("received-requests",
-        "request", TimeUnit.SECONDS);
-    receivedBytesHist = superstepMetrics.getHistogram("received-bytes", false);
+    sentRequestsMeter = superstepMetrics.getMeter(MeterDesc.SENT_REQUESTS);
+    sentBytesHist = superstepMetrics.getUniformHistogram(
+        MetricNames.SENT_BYTES);
+    receivedRequestsMeter = superstepMetrics.getMeter(
+        MeterDesc.RECEIVED_REQUESTS);
+    receivedBytesHist = superstepMetrics.getUniformHistogram(
+        MetricNames.RECEIVED_BYTES);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
index 9e6ed66..9c09524 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
@@ -18,22 +18,30 @@
 
 package org.apache.giraph.comm.netty;
 
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.WorkerClient;
+import org.apache.giraph.comm.requests.RequestType;
 import org.apache.giraph.comm.requests.WritableRequest;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.TaskInfo;
-import org.apache.giraph.worker.WorkerInfo;
+import org.apache.giraph.metrics.GiraphMetrics;
+import org.apache.giraph.metrics.MetricNames;
+import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
+import org.apache.giraph.metrics.SuperstepMetricsRegistry;
 import org.apache.giraph.partition.PartitionOwner;
+import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.yammer.metrics.core.Counter;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Takes users facing APIs in {@link WorkerClient} and implements them
@@ -47,7 +55,7 @@ import java.util.List;
 @SuppressWarnings("rawtypes")
 public class NettyWorkerClient<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable> implements
-    WorkerClient<I, V, E, M> {
+    WorkerClient<I, V, E, M>, ResetSuperstepMetricsObserver {
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(NettyWorkerClient.class);
   /** Hadoop configuration */
@@ -57,6 +65,10 @@ public class NettyWorkerClient<I extends WritableComparable,
   /** Centralized service, needed to get vertex ranges */
   private final CentralizedServiceWorker<I, V, E, M> service;
 
+  // Metrics
+  /** Per-superstep, per-request counters */
+  private final Map<RequestType, Counter> superstepRequestCounters;
+
   /**
    * Only constructor.
    *
@@ -72,6 +84,31 @@ public class NettyWorkerClient<I extends WritableComparable,
         new NettyClient(context, configuration, service.getWorkerInfo());
     this.conf = configuration;
     this.service = service;
+    this.superstepRequestCounters = Maps.newHashMap();
+    GiraphMetrics.get().addSuperstepResetObserver(this);
+  }
+
+  @Override
+  public void newSuperstep(SuperstepMetricsRegistry metrics) {
+    superstepRequestCounters.clear();
+    superstepRequestCounters.put(RequestType.SEND_VERTEX_REQUEST,
+        metrics.getCounter(MetricNames.SEND_VERTEX_REQUESTS));
+    superstepRequestCounters.put(RequestType.SEND_WORKER_MESSAGES_REQUEST,
+        metrics.getCounter(MetricNames.SEND_WORKER_MESSAGES_REQUESTS));
+    superstepRequestCounters.put(
+        RequestType.SEND_PARTITION_CURRENT_MESSAGES_REQUEST,
+        metrics.getCounter(
+            MetricNames.SEND_PARTITION_CURRENT_MESSAGES_REQUESTS));
+    superstepRequestCounters.put(RequestType.SEND_PARTITION_MUTATIONS_REQUEST,
+        metrics.getCounter(MetricNames.SEND_PARTITION_MUTATIONS_REQUESTS));
+    superstepRequestCounters.put(RequestType.SEND_WORKER_AGGREGATORS_REQUEST,
+        metrics.getCounter(MetricNames.SEND_WORKER_AGGREGATORS_REQUESTS));
+    
superstepRequestCounters.put(RequestType.SEND_AGGREGATORS_TO_MASTER_REQUEST,
+        metrics.getCounter(MetricNames.SEND_AGGREGATORS_TO_MASTER_REQUESTS));
+    superstepRequestCounters.put(RequestType.SEND_AGGREGATORS_TO_OWNER_REQUEST,
+        metrics.getCounter(MetricNames.SEND_AGGREGATORS_TO_OWNER_REQUESTS));
+    
superstepRequestCounters.put(RequestType.SEND_AGGREGATORS_TO_WORKER_REQUEST,
+        metrics.getCounter(MetricNames.SEND_AGGREGATORS_TO_WORKER_REQUESTS));
   }
 
   public CentralizedServiceWorker<I, V, E, M> getService() {
@@ -100,6 +137,10 @@ public class NettyWorkerClient<I extends 
WritableComparable,
   @Override
   public void sendWritableRequest(Integer destTaskId,
                                   WritableRequest request) {
+    Counter counter = superstepRequestCounters.get(request.getType());
+    if (counter != null) {
+      counter.inc();
+    }
     nettyClient.sendWritableRequest(destTaskId, request);
   }
 
@@ -134,5 +175,6 @@ else[HADOOP_NON_SECURE]*/
   public void authenticate() {
     nettyClient.authenticate();
   }
+
 /*end[HADOOP_NON_SECURE]*/
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
index 2a4ee8d..d4e919e 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
@@ -37,7 +37,8 @@ import 
org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.Edge;
 import org.apache.giraph.graph.VertexMutations;
 import org.apache.giraph.metrics.GiraphMetrics;
-import org.apache.giraph.metrics.ValueGauge;
+import org.apache.giraph.metrics.MetricNames;
+import org.apache.giraph.metrics.SuperstepMetricsRegistry;
 import org.apache.giraph.partition.Partition;
 import org.apache.giraph.partition.PartitionOwner;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
@@ -49,6 +50,10 @@ import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
 
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.util.PercentGauge;
+
 import java.io.IOException;
 import java.util.Map;
 
@@ -91,8 +96,10 @@ public class NettyWorkerClientRequestProcessor<I extends 
WritableComparable,
   private final ServerData<I, V, E, M> serverData;
 
   // Per-Superstep Metrics
-  /** messages sent in a superstep */
-  private final ValueGauge<Long> msgsSentInSuperstep;
+  /** Number of requests that went on the wire */
+  private final Counter localRequests;
+  /** Number of requests that were handled locally */
+  private final Counter remoteRequests;
 
   /**
    * Constructor.
@@ -123,8 +130,25 @@ public class NettyWorkerClientRequestProcessor<I extends 
WritableComparable,
 
     // Per-Superstep Metrics.
     // Since this object is not long lived we just initialize the metrics here.
-    GiraphMetrics gmr = GiraphMetrics.get();
-    msgsSentInSuperstep = new ValueGauge<Long>(gmr.perSuperstep(), 
"msgs-sent");
+    SuperstepMetricsRegistry smr = GiraphMetrics.get().perSuperstep();
+    localRequests = smr.getCounter(MetricNames.LOCAL_REQUESTS);
+    remoteRequests = smr.getCounter(MetricNames.REMOTE_REQUESTS);
+    final Gauge<Long> totalRequests = smr.getGauge(MetricNames.TOTAL_REQUESTS,
+        new Gauge<Long>() {
+          @Override public Long value() {
+            return localRequests.count() + remoteRequests.count();
+          }
+        }
+    );
+    smr.getGauge(MetricNames.PERCENT_LOCAL_REQUESTS, new PercentGauge() {
+      @Override protected double getNumerator() {
+        return localRequests.count();
+      }
+
+      @Override protected double getDenominator() {
+        return totalRequests.value();
+      }
+    });
   }
 
   @Override
@@ -374,7 +398,6 @@ public class NettyWorkerClientRequestProcessor<I extends 
WritableComparable,
 
   @Override
   public long resetMessageCount() {
-    msgsSentInSuperstep.set(totalMsgsSentInSuperstep);
     long messagesSentInSuperstep = totalMsgsSentInSuperstep;
     totalMsgsSentInSuperstep = 0;
     return messagesSentInSuperstep;
@@ -397,9 +420,11 @@ public class NettyWorkerClientRequestProcessor<I extends 
WritableComparable,
     if (serviceWorker.getWorkerInfo().getTaskId() ==
         workerInfo.getTaskId()) {
       ((WorkerRequest) writableRequest).doRequest(serverData);
+      localRequests.inc();
     } else {
       workerClient.sendWritableRequest(
           workerInfo.getTaskId(), writableRequest);
+      remoteRequests.inc();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java
 
b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java
index 22e4944..a96842d 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java
@@ -20,11 +20,14 @@ package org.apache.giraph.comm.requests;
 
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.graph.VertexMutations;
+import org.apache.giraph.metrics.GiraphMetrics;
+import org.apache.giraph.metrics.MetricNames;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
 
 import com.google.common.collect.Maps;
+import com.yammer.metrics.core.Histogram;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -110,6 +113,9 @@ public class SendPartitionMutationsRequest<I extends 
WritableComparable,
   public void doRequest(ServerData<I, V, E, M> serverData) {
     ConcurrentHashMap<I, VertexMutations<I, V, E, M>> vertexMutations =
       serverData.getVertexMutations();
+    Histogram verticesInMutationHist = GiraphMetrics.get().perSuperstep()
+        .getUniformHistogram(MetricNames.VERTICES_IN_MUTATION_REQUEST);
+    verticesInMutationHist.update(vertexMutations.size());
     for (Entry<I, VertexMutations<I, V, E, M>> entry :
         vertexIdMutations.entrySet()) {
       VertexMutations<I, V, E, M> mutations =

http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java 
b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index a87561d..94ed6d9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -17,29 +17,22 @@
  */
 package org.apache.giraph.graph;
 
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.yammer.metrics.core.Timer;
-import com.yammer.metrics.core.TimerContext;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
 import org.apache.giraph.comm.messages.MessageStoreByPartition;
 import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.metrics.GiraphMetrics;
+import org.apache.giraph.metrics.MetricNames;
+import org.apache.giraph.metrics.SuperstepMetricsRegistry;
+import org.apache.giraph.metrics.TimerDesc;
 import org.apache.giraph.partition.Partition;
 import org.apache.giraph.partition.PartitionStats;
-import org.apache.giraph.metrics.GiraphMetrics;
-
-import org.apache.giraph.utils.MemoryUtils;
 import org.apache.giraph.time.SystemTime;
 import org.apache.giraph.time.Time;
-import org.apache.giraph.utils.TimedLogger;
 import org.apache.giraph.time.Times;
+import org.apache.giraph.utils.MemoryUtils;
+import org.apache.giraph.utils.TimedLogger;
 import org.apache.giraph.vertex.Vertex;
 import org.apache.giraph.worker.WorkerThreadAggregatorUsage;
 import org.apache.hadoop.io.Writable;
@@ -47,6 +40,18 @@ import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
 
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Timer;
+import com.yammer.metrics.core.TimerContext;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+
 /**
  * Compute as many vertex partitions as possible.  Every thread will has its
  * own instance of WorkerClientRequestProcessor to send requests.  Note that
@@ -62,8 +67,6 @@ import org.apache.log4j.Logger;
  */
 public class ComputeCallable<I extends WritableComparable, V extends Writable,
     E extends Writable, M extends Writable> implements Callable {
-  /** Name of timer for compute call */
-  public static final String TIMER_COMPUTE_ONE = "compute-one";
   /** Class logger */
   private static final Logger LOG  = Logger.getLogger(ComputeCallable.class);
   /** Class time object */
@@ -89,6 +92,8 @@ public class ComputeCallable<I extends WritableComparable, V 
extends Writable,
   private final long startNanos = TIME.getNanoseconds();
 
   // Per-Superstep Metrics
+  /** Messages sent */
+  private final Counter messagesSentCounter;
   /** Timer for single compute() call */
   private final Timer computeOneTimer;
 
@@ -116,10 +121,11 @@ public class ComputeCallable<I extends 
WritableComparable, V extends Writable,
     // Will be replaced later in call() for locality
     this.graphState = graphState;
 
-    GiraphMetrics metrics = GiraphMetrics.get();
+    SuperstepMetricsRegistry metrics = GiraphMetrics.get().perSuperstep();
     // Normally we would use ResetSuperstepMetricsObserver but this class is
     // not long-lived, so just instantiating in the constructor is good enough.
-    computeOneTimer = metrics.perSuperstep().getTimer(TIMER_COMPUTE_ONE);
+    computeOneTimer = metrics.getTimer(TimerDesc.COMPUTE_ONE);
+    messagesSentCounter = metrics.getCounter(MetricNames.MESSAGES_SENT);
   }
 
   @Override
@@ -148,8 +154,9 @@ public class ComputeCallable<I extends WritableComparable, 
V extends Writable,
       try {
         PartitionStats partitionStats = computePartition(partition);
         partitionStatsList.add(partitionStats);
-        partitionStats.addMessagesSentCount(
-            workerClientRequestProcessor.resetMessageCount());
+        long partitionMsgs = workerClientRequestProcessor.resetMessageCount();
+        partitionStats.addMessagesSentCount(partitionMsgs);
+        messagesSentCounter.inc(partitionMsgs);
         timedLogger.info("call: Completed " +
             partitionStatsList.size() + " partitions, " +
             partitionIdQueue.size() + " remaining " +

http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java 
b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index f7fb7e9..3624728 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -312,7 +312,6 @@ public class GraphTaskManager<I extends WritableComparable, 
V extends Writable,
    * Utility to encapsulate Giraph metrics setup calls
    */
   private void setupAndInitializeGiraphMetrics() {
-    // Set up GiraphMetrics
     GiraphMetrics.init(conf);
     GiraphMetrics.get().addSuperstepResetObserver(this);
     initJobMetrics();
@@ -368,7 +367,7 @@ public class GraphTaskManager<I extends WritableComparable, 
V extends Writable,
       serviceWorker.finishSuperstep(graphState, partitionStatsList);
     superstepTimerContext.stop();
     if (conf.metricsEnabled()) {
-      GiraphMetrics.get().perSuperstep().printSummary();
+      GiraphMetrics.get().perSuperstep().printSummary(System.err);
     }
     return finishedSuperstepStats;
   }
@@ -802,8 +801,7 @@ public class GraphTaskManager<I extends WritableComparable, 
V extends Writable,
     if (done) {
       return true;
     }
-    GiraphMetrics.get().
-      resetSuperstepMetrics(BspService.INPUT_SUPERSTEP);
+    GiraphMetrics.get().resetSuperstepMetrics(BspService.INPUT_SUPERSTEP);
     if (graphFunctions.isNotAWorker()) {
       if (LOG.isInfoEnabled()) {
         LOG.info("map: No need to do anything when not a worker");

http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java 
b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 677ab82..1f4a184 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -923,7 +923,7 @@ public class BspServiceMaster<I extends WritableComparable,
     }
 
     if (conf.metricsEnabled()) {
-      aggregatedMetrics.print(superstep);
+      aggregatedMetrics.print(superstep, System.err);
     }
 
     if (LOG.isInfoEnabled()) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetrics.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetrics.java 
b/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetrics.java
index 5f66823..9ce074b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetrics.java
+++ b/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetrics.java
@@ -105,16 +105,6 @@ public class AggregatedMetrics {
   }
 
   /**
-   * Print the aggregated metrics to the stream provided.
-   *
-   * @param superstep long number of superstep.
-   * @return this
-   */
-  public AggregatedMetrics print(long superstep) {
-    return print(superstep, System.out);
-  }
-
-  /**
    * Print batch of lines for AggregatedMetric
    *
    * @param out PrintStream to write to

http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphMetrics.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphMetrics.java 
b/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphMetrics.java
index 43e40d9..d104ec1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphMetrics.java
+++ b/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphMetrics.java
@@ -131,11 +131,4 @@ public class GiraphMetrics {
     perJob.printToStream(out);
     perSuperstep.printToStream(out);
   }
-
-  /**
-   * Dump all metrics to stdout.
-   */
-  public void dumpToStdout() {
-    dumpToStream(System.out);
-  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphMetricsRegistry.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphMetricsRegistry.java
 
b/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphMetricsRegistry.java
index 184c97e..355e510 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphMetricsRegistry.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphMetricsRegistry.java
@@ -168,14 +168,25 @@ public class GiraphMetricsRegistry {
   }
 
   /**
-   * Creates a new non-biased {@link com.yammer.metrics.core.Histogram} and
-   * registers it under the given group and name.
+   * Creates a new biased {@link Histogram} and registers it under the given
+   * group and name
    *
-   * @param name  the name of the metric
-   * @return a new {@link com.yammer.metrics.core.Histogram}
+   * @param name name of metric
+   * @return new {@link Histogram}
    */
-  public Histogram getHistogram(String name) {
-    return registry.newHistogram(makeMetricName(name), false);
+  public Histogram getBiasedHistogram(String name) {
+    return getHistogram(name, true);
+  }
+
+  /**
+   * Creates a new uniform {@link Histogram} and registers it under the given
+   * group and name
+   *
+   * @param name name of metric
+   * @return new {@link Histogram}
+   */
+  public Histogram getUniformHistogram(String name) {
+    return getHistogram(name, false);
   }
 
   /**
@@ -186,7 +197,7 @@ public class GiraphMetricsRegistry {
    * @param biased whether or not the histogram should be biased
    * @return a new {@link Histogram}
    */
-  public Histogram getHistogram(String name, boolean biased) {
+  private Histogram getHistogram(String name, boolean biased) {
     return registry.newHistogram(makeMetricName(name), biased);
   }
 
@@ -194,6 +205,18 @@ public class GiraphMetricsRegistry {
    * Creates a new {@link com.yammer.metrics.core.Meter} and registers it under
    * the given group and name.
    *
+   * @param meterDesc description of meter
+   * @return new {@link com.yammer.metrics.core.Meter}
+   */
+  public Meter getMeter(MeterDesc meterDesc) {
+    return getMeter(meterDesc.getName(), meterDesc.getType(),
+        meterDesc.getTimeUnit());
+  }
+
+  /**
+   * Creates a new {@link com.yammer.metrics.core.Meter} and registers it under
+   * the given group and name.
+   *
    * @param name      the name of the metric
    * @param eventType the plural name of the type of events the meter is
    *                  measuring (e.g., {@code "requests"})
@@ -205,15 +228,15 @@ public class GiraphMetricsRegistry {
   }
 
   /**
-   * Creates a new {@link com.yammer.metrics.core.Timer} and registers it under
-   * the given group and name, measuring elapsed time in milliseconds and
-   * invocations per second.
+   * Create a new {@link Timer} from the description and registers it under the
+   * given group and name.
    *
-   * @param name  the name of the metric
-   * @return a new {@link com.yammer.metrics.core.Timer}
+   * @param timerDesc TimerDesc describing the timer
+   * @return new {@link Timer}
    */
-  public Timer getTimer(String name) {
-    return getTimer(name, TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
+  public Timer getTimer(TimerDesc timerDesc) {
+    return getTimer(timerDesc.getName(), timerDesc.getDurationUnit(),
+        timerDesc.getTimeUnit());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/metrics/MeterDesc.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/MeterDesc.java 
b/giraph-core/src/main/java/org/apache/giraph/metrics/MeterDesc.java
new file mode 100644
index 0000000..88bd542
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/metrics/MeterDesc.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.metrics;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Description for Meters used in Giraph.
+ */
+public enum MeterDesc {
+  /** Number of requests received */
+  RECEIVED_REQUESTS("requests-received", "requests", TimeUnit.SECONDS),
+  /** Number of requests sent */
+  SENT_REQUESTS("requests-sent", "requests", TimeUnit.SECONDS),
+  /** Total edges loaded */
+  EDGES_LOADED("edges-loaded", "edges", TimeUnit.SECONDS),
+  /** Total vertices loaded */
+  VERTICES_LOADED("vertices-loaded", "vertices", TimeUnit.SECONDS);
+
+  /** Name of meter */
+  private final String name;
+  /** Type this meter tracks */
+  private final String type;
+  /** TimeUnit this meter tracks in */
+  private final TimeUnit timeUnit;
+
+  /**
+   * Constructor
+   * @param name String name of meter
+   * @param type String type of meter
+   * @param timeUnit TimeUnit meter tracks
+   */
+  private MeterDesc(String name, String type, TimeUnit timeUnit) {
+    this.name = name;
+    this.type = type;
+    this.timeUnit = timeUnit;
+  }
+
+  /**
+   * Get name of meter
+   * @return String name
+   */
+  public String getName() {
+    return name;
+  }
+
+  /**
+   * Get TimeUnit of meter
+   * @return TimeUnit
+   */
+  public TimeUnit getTimeUnit() {
+    return timeUnit;
+  }
+
+  /**
+   * Get type of meter
+   * @return String type
+   */
+  public String getType() {
+    return type;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/metrics/MetricNames.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/metrics/MetricNames.java 
b/giraph-core/src/main/java/org/apache/giraph/metrics/MetricNames.java
new file mode 100644
index 0000000..52a3d15
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/metrics/MetricNames.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.metrics;
+
+/**
+ * Class holding metric names used in Giraph
+ */
+// CHECKSTYLE: stop InterfaceIsTypeCheck
+public interface MetricNames {
+// CHECKSTYLE: resume InterfaceIsTypeCheck
+
+  
//////////////////////////////////////////////////////////////////////////////
+  // Request counters per superstep
+  
//////////////////////////////////////////////////////////////////////////////
+  /** Counter of requests handled locally */
+  String LOCAL_REQUESTS = "local-requests";
+  /** Counter of requets sent on the wire */
+  String REMOTE_REQUESTS = "remote-requests";
+  /** Guage of total requests */
+  String TOTAL_REQUESTS = "total-requests";
+  /** PercentGauge of requests that are handled locally */
+  String PERCENT_LOCAL_REQUESTS = "percent-local-requests";
+
+  /** Counter for sending vertices requests */
+  String SEND_VERTEX_REQUESTS = "send-vertex-requests";
+  /** Counter for sending a partition of messages for next superstep */
+  String SEND_WORKER_MESSAGES_REQUESTS = "send-worker-messages-requests";
+  /**
+   * Counter for sending a partition of messages for current superstep
+   * (used during partition exchange)
+   */
+  String SEND_PARTITION_CURRENT_MESSAGES_REQUESTS =
+      "send-partition-current-messages-requests";
+  /** Counter for sending a partition of mutations */
+  String SEND_PARTITION_MUTATIONS_REQUESTS =
+      "send-partition-mutations-requests";
+  /** Counter for sending aggregated values from one worker's vertices */
+  String SEND_WORKER_AGGREGATORS_REQUESTS = "send-worker-aggregators-requests";
+  /** Counter for sending aggregated values from worker owner to master */
+  String SEND_AGGREGATORS_TO_MASTER_REQUESTS =
+      "send-aggregators-to-master-requests";
+  /** Counter for sending aggregators from master to worker owners */
+  String SEND_AGGREGATORS_TO_OWNER_REQUESTS =
+      "send-aggregators-to-owner-requests";
+  /** Counter for sending aggregators from worker owner to other workers */
+  String SEND_AGGREGATORS_TO_WORKER_REQUESTS =
+      "send-aggregators-to-worker-requests";
+  
//////////////////////////////////////////////////////////////////////////////
+  // End of Request counters per superstep
+  
//////////////////////////////////////////////////////////////////////////////
+
+  /** Counter of messages sent in superstep */
+  String MESSAGES_SENT = "messages-sent";
+
+  /** Histogram for vertices in mutations requests */
+  String VERTICES_IN_MUTATION_REQUEST = "vertices-per-mutations-request";
+
+  /** Number of bytes sent in superstep */
+  String SENT_BYTES = "sent-bytes";
+  /** Number of bytes received in superstep */
+  String RECEIVED_BYTES = "received-bytes";
+
+  /** PercentGauge of memory free */
+  String MEMORY_FREE_PERCENT = "memory-free-pct";
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java
 
b/giraph-core/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java
index 71aad31..57b858e 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java
@@ -49,6 +49,14 @@ public class SuperstepMetricsRegistry extends 
GiraphMetricsRegistry {
   }
 
   /**
+   * Get superstep stored here
+   * @return long superstep
+   */
+  public long getSuperstep() {
+    return superstep;
+  }
+
+  /**
    * Set superstep number used. Internally sets the group for metrics created.
    *
    * @param superstep long number of superstep to use.
@@ -66,11 +74,4 @@ public class SuperstepMetricsRegistry extends 
GiraphMetricsRegistry {
   public void printSummary(PrintStream out) {
     new WorkerSuperstepMetrics().readFromRegistry().print(superstep, out);
   }
-
-  /**
-   * Print human readable summary of superstep metrics.
-   */
-  public void printSummary() {
-    printSummary(System.out);
-  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/metrics/TimerDesc.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/TimerDesc.java 
b/giraph-core/src/main/java/org/apache/giraph/metrics/TimerDesc.java
new file mode 100644
index 0000000..1e7f3a9
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/metrics/TimerDesc.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.metrics;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Description for Timers used in Giraph
+ */
+public enum TimerDesc {
+
+  /** Timer around Vertex#compute() */
+  COMPUTE_ONE("compute-one", TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
+
+  /** Name of timer */
+  private final String name;
+  /** Duration unit for timer */
+  private final TimeUnit durationUnit;
+  /** Time unit for timer */
+  private final TimeUnit timeUnit;
+
+  /**
+   * Constructor
+   * @param name String name of timer
+   * @param durationUnit Duration unit of timer
+   * @param timeUnit Time unit of timer
+   */
+  private TimerDesc(String name, TimeUnit durationUnit, TimeUnit timeUnit) {
+    this.name = name;
+    this.durationUnit = durationUnit;
+    this.timeUnit = timeUnit;
+  }
+
+  /**
+   * Get duration unit of timer
+   * @return TimeUnit
+   */
+  public TimeUnit getDurationUnit() {
+    return durationUnit;
+  }
+
+  /**
+   * Get name of timer
+   * @return String name
+   */
+  public String getName() {
+    return name;
+  }
+
+  /**
+   * Get time unit of timer
+   * @return TimeUnit of timer
+   */
+  public TimeUnit getTimeUnit() {
+    return timeUnit;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java
 
b/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java
index 90567e4..e51f96e 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java
@@ -18,9 +18,8 @@
 
 package org.apache.giraph.metrics;
 
-import org.apache.giraph.worker.BspServiceWorker;
-import org.apache.giraph.graph.ComputeCallable;
 import org.apache.giraph.graph.GraphTaskManager;
+import org.apache.giraph.worker.BspServiceWorker;
 import org.apache.hadoop.io.Writable;
 
 import com.yammer.metrics.core.Gauge;
@@ -75,8 +74,7 @@ public class WorkerSuperstepMetrics implements Writable {
     readGiraphTimer(GraphTaskManager.TIMER_TIME_TO_FIRST_MSG, timeToFirstMsg);
     readGiraphTimer(GraphTaskManager.TIMER_SUPERSTEP_TIME, superstepTimer);
     readGiraphTimer(BspServiceWorker.TIMER_WAIT_REQUESTS, waitRequestsTimer);
-    userComputeTime.setValue((long) ssm.getTimer(
-        ComputeCallable.TIMER_COMPUTE_ONE).sum());
+    userComputeTime.setValue((long) ssm.getTimer(TimerDesc.COMPUTE_ONE).sum());
     return this;
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/utils/MemoryUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/MemoryUtils.java 
b/giraph-core/src/main/java/org/apache/giraph/utils/MemoryUtils.java
index eec8388..b5ebb10 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/MemoryUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/MemoryUtils.java
@@ -18,9 +18,11 @@
 
 package org.apache.giraph.utils;
 
-import com.yammer.metrics.util.PercentGauge;
 import org.apache.giraph.metrics.GiraphMetrics;
 import org.apache.giraph.metrics.GiraphMetricsRegistry;
+import org.apache.giraph.metrics.MetricNames;
+
+import com.yammer.metrics.util.PercentGauge;
 
 /**
  * Helper static methods for tracking memory usage.
@@ -67,7 +69,7 @@ public class MemoryUtils {
    */
   public static void initMetrics() {
     GiraphMetricsRegistry metrics = GiraphMetrics.get().perJob();
-    metrics.getGauge("memory-free-pct", new PercentGauge() {
+    metrics.getGauge(MetricNames.MEMORY_FREE_PERCENT, new PercentGauge() {
         @Override
         protected double getNumerator() {
           return freeMemoryMB();
@@ -87,8 +89,7 @@ public class MemoryUtils {
    * @return String of all Runtime stats.
    */
   public static String getRuntimeMemoryStats() {
-    return "totalMem = " + totalMemoryMB() +
-      "M, maxMem = "  + maxMemoryMB() +
-      "M, freeMem = " + freeMemoryMB() + "M";
+    return String.format("Memory (free/total/max) = %.2fM / %.2fM / %.2fM",
+            freeMemoryMB(), totalMemoryMB(), maxMemoryMB());
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java 
b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index e48e01a..fa3ab49 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -1015,7 +1015,7 @@ else[HADOOP_NON_SECURE]*/
     }
 
     if (getConfiguration().metricsEnabled()) {
-      GiraphMetrics.get().dumpToStdout();
+      GiraphMetrics.get().dumpToStream(System.err);
     }
 
     // Preferably would shut down the service only after

http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
 
b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
index 3e2dc66..bdf9f57 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
@@ -24,8 +24,6 @@ import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.VertexEdgeCount;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeReader;
-import org.apache.giraph.metrics.GiraphMetrics;
-import org.apache.giraph.metrics.GiraphMetricsRegistry;
 import org.apache.giraph.utils.LoggerUtils;
 import org.apache.giraph.utils.MemoryUtils;
 import org.apache.giraph.zk.ZooKeeperExt;
@@ -36,7 +34,7 @@ import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
-import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Meter;
 
 import java.io.IOException;
 
@@ -53,17 +51,18 @@ import java.io.IOException;
 public class EdgeInputSplitsCallable<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
     extends InputSplitsCallable<I, V, E, M> {
+  /** How often to update metrics and print info */
+  public static final int VERTICES_UPDATE_PERIOD = 1000000;
+
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(
       EdgeInputSplitsCallable.class);
-  /** Total edges loaded */
-  private long totalEdgesLoaded = 0;
   /** Input split max edges (-1 denotes all) */
   private final long inputSplitMaxEdges;
 
   // Metrics
-  /** number of edges loaded counter */
-  private final Counter edgesLoadedCounter;
+  /** edges loaded meter across all readers */
+  private final Meter totalEdgesMeter;
 
   /**
    * Constructor.
@@ -88,8 +87,7 @@ public class EdgeInputSplitsCallable<I extends 
WritableComparable,
     inputSplitMaxEdges = configuration.getInputSplitMaxEdges();
 
     // Initialize Metrics
-    GiraphMetricsRegistry jobMetrics = GiraphMetrics.get().perJob();
-    edgesLoadedCounter = jobMetrics.getCounter(COUNTER_EDGES_LOADED);
+    totalEdgesMeter = getTotalEdgesLoadedMeter();
   }
 
   /**
@@ -137,11 +135,13 @@ public class EdgeInputSplitsCallable<I extends 
WritableComparable,
       context.progress(); // do this before potential data transfer
       ++inputSplitEdgesLoaded;
 
-      // Update status every 1M edges
-      if (((inputSplitEdgesLoaded + totalEdgesLoaded) % 1000000) == 0) {
+      // Update status every VERTICES_UPDATE_PERIOD edges
+      if (inputSplitEdgesLoaded % VERTICES_UPDATE_PERIOD == 0) {
+        totalEdgesMeter.mark(VERTICES_UPDATE_PERIOD);
         LoggerUtils.setStatusAndLog(context, LOG, Level.INFO,
-            "readInputSplit: Loaded " +
-                (inputSplitEdgesLoaded + totalEdgesLoaded) + " edges " +
+            "readEdgeInputSplit: Loaded " +
+                totalEdgesMeter.count() + " edges at " +
+                totalEdgesMeter.meanRate() + " edges/sec " +
                 MemoryUtils.getRuntimeMemoryStats());
       }
 
@@ -158,8 +158,6 @@ public class EdgeInputSplitsCallable<I extends 
WritableComparable,
       }
     }
     edgeReader.close();
-    totalEdgesLoaded += inputSplitEdgesLoaded;
-    edgesLoadedCounter.inc(inputSplitEdgesLoaded);
     return new VertexEdgeCount(0, inputSplitEdgesLoaded);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java 
b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
index 5487ab7..0ec20fd 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
@@ -23,6 +23,8 @@ import 
org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.VertexEdgeCount;
+import org.apache.giraph.metrics.GiraphMetrics;
+import org.apache.giraph.metrics.MeterDesc;
 import org.apache.giraph.time.SystemTime;
 import org.apache.giraph.time.Time;
 import org.apache.giraph.time.Times;
@@ -36,6 +38,8 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.KeeperException;
 
+import com.yammer.metrics.core.Meter;
+
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
@@ -54,10 +58,6 @@ import java.util.concurrent.Callable;
 public abstract class InputSplitsCallable<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
     implements Callable<VertexEdgeCount> {
-  /** Name of counter for vertices loaded */
-  public static final String COUNTER_VERTICES_LOADED = "vertices-loaded";
-  /** Name of counter for edges loaded */
-  public static final String COUNTER_EDGES_LOADED = "edges-loaded";
   /** Class logger */
   private static final Logger LOG = 
Logger.getLogger(InputSplitsCallable.class);
   /** Class time object */
@@ -118,6 +118,24 @@ public abstract class InputSplitsCallable<I extends 
WritableComparable,
   // CHECKSTYLE: resume ParameterNumberCheck
 
   /**
+   * Get Meter tracking edges loaded
+   *
+   * @return Meter tracking edges loaded
+   */
+  public static Meter getTotalEdgesLoadedMeter() {
+    return GiraphMetrics.get().perJob().getMeter(MeterDesc.EDGES_LOADED);
+  }
+
+  /**
+   * Get Meter tracking number of vertices loaded.
+   *
+   * @return Meter for vertices loaded
+   */
+  public static Meter getTotalVerticesLoadedMeter() {
+    return GiraphMetrics.get().perJob().getMeter(MeterDesc.VERTICES_LOADED);
+  }
+
+  /**
    * Load vertices/edges from the given input split.
    *
    * @param inputSplit Input split to load

http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
 
b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
index a192aeb..de1ae59 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
@@ -18,15 +18,11 @@
 
 package org.apache.giraph.worker;
 
-import com.yammer.metrics.core.Counter;
-import java.io.IOException;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.VertexEdgeCount;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexReader;
-import org.apache.giraph.metrics.GiraphMetrics;
-import org.apache.giraph.metrics.GiraphMetricsRegistry;
 import org.apache.giraph.partition.PartitionOwner;
 import org.apache.giraph.utils.LoggerUtils;
 import org.apache.giraph.utils.MemoryUtils;
@@ -39,6 +35,10 @@ import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
+import com.yammer.metrics.core.Meter;
+
+import java.io.IOException;
+
 /**
  * Load as many vertex input splits as possible.
  * Every thread will has its own instance of WorkerClientRequestProcessor
@@ -52,23 +52,21 @@ import org.apache.log4j.Logger;
 public class VertexInputSplitsCallable<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
     extends InputSplitsCallable<I, V, E, M> {
+  /** How often to update metrics and print info */
+  public static final int VERTICES_UPDATE_PERIOD = 250000;
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(VertexInputSplitsCallable.class);
-  /** Total vertices loaded */
-  private long totalVerticesLoaded = 0;
-  /** Total edges loaded */
-  private long totalEdgesLoaded = 0;
   /** Input split max vertices (-1 denotes all) */
   private final long inputSplitMaxVertices;
   /** Bsp service worker (only use thread-safe methods) */
   private final BspServiceWorker<I, V, E, M> bspServiceWorker;
 
   // Metrics
-  /** number of vertices loaded counter */
-  private final Counter verticesLoadedCounter;
-  /** number of edges loaded counter */
-  private final Counter edgesLoadedCounter;
+  /** number of vertices loaded meter across all readers */
+  private final Meter totalVerticesMeter;
+  /** number of edges loaded meter across all readers */
+  private final Meter totalEdgesMeter;
 
   /**
    * Constructor.
@@ -94,9 +92,8 @@ public class VertexInputSplitsCallable<I extends 
WritableComparable,
     this.bspServiceWorker = bspServiceWorker;
 
     // Initialize Metrics
-    GiraphMetricsRegistry jobMetrics = GiraphMetrics.get().perJob();
-    verticesLoadedCounter = jobMetrics.getCounter(COUNTER_VERTICES_LOADED);
-    edgesLoadedCounter = jobMetrics.getCounter(COUNTER_EDGES_LOADED);
+    totalVerticesMeter = getTotalVerticesLoadedMeter();
+    totalEdgesMeter = getTotalEdgesLoadedMeter();
   }
 
   /**
@@ -120,9 +117,8 @@ public class VertexInputSplitsCallable<I extends 
WritableComparable,
         vertexInputFormat.createVertexReader(inputSplit, context);
     vertexReader.initialize(inputSplit, context);
     long inputSplitVerticesLoaded = 0;
+    long edgesSinceLastUpdate = 0;
     long inputSplitEdgesLoaded = 0;
-    long nextPrintVertices = 0;
-    long nextPrintMsecs = System.currentTimeMillis() + 15000;
     while (vertexReader.nextVertex()) {
       Vertex<I, V, E, M> readerVertex =
           vertexReader.getCurrentVertex();
@@ -143,22 +139,23 @@ public class VertexInputSplitsCallable<I extends 
WritableComparable,
           partitionOwner, readerVertex);
       context.progress(); // do this before potential data transfer
       ++inputSplitVerticesLoaded;
-      inputSplitEdgesLoaded += readerVertex.getNumEdges();
+      edgesSinceLastUpdate += readerVertex.getNumEdges();
+
+      // Update status every VERTICES_UPDATE_PERIOD vertices
+      if (inputSplitVerticesLoaded % VERTICES_UPDATE_PERIOD == 0) {
+        totalVerticesMeter.mark(VERTICES_UPDATE_PERIOD);
+        totalEdgesMeter.mark(edgesSinceLastUpdate);
+        inputSplitEdgesLoaded += edgesSinceLastUpdate;
+        edgesSinceLastUpdate = 0;
 
-      // Update status at most every 250k vertices or 15 seconds
-      if ((inputSplitVerticesLoaded + totalVerticesLoaded) >
-          nextPrintVertices &&
-          System.currentTimeMillis() > nextPrintMsecs) {
         LoggerUtils.setStatusAndLog(
             context, LOG, Level.INFO,
-            "readInputSplit: Loaded " +
-                (inputSplitVerticesLoaded + totalVerticesLoaded) +
-                " vertices " +
-                (inputSplitEdgesLoaded + totalEdgesLoaded) + " edges " +
+            "readVertexInputSplit: Loaded " +
+                totalVerticesMeter.count() + " vertices at " +
+                totalVerticesMeter.meanRate() + " vertices/sec " +
+                totalEdgesMeter.count() + " edges at " +
+                totalEdgesMeter.meanRate() + " edges/sec " +
                 MemoryUtils.getRuntimeMemoryStats());
-        nextPrintMsecs = System.currentTimeMillis() + 15000;
-        nextPrintVertices = inputSplitVerticesLoaded + totalVerticesLoaded +
-            250000;
       }
 
       // For sampling, or to limit outlier input splits, the number of
@@ -174,12 +171,8 @@ public class VertexInputSplitsCallable<I extends 
WritableComparable,
       }
     }
     vertexReader.close();
-    totalVerticesLoaded += inputSplitVerticesLoaded;
-    verticesLoadedCounter.inc(inputSplitVerticesLoaded);
-    totalEdgesLoaded += inputSplitEdgesLoaded;
-    edgesLoadedCounter.inc(inputSplitEdgesLoaded);
-    return new VertexEdgeCount(
-        inputSplitVerticesLoaded, inputSplitEdgesLoaded);
+    return new VertexEdgeCount(inputSplitVerticesLoaded,
+        inputSplitEdgesLoaded + edgesSinceLastUpdate);
   }
 }
 

Reply via email to