Updated Branches: refs/heads/trunk 02fb21d93 -> 8ed06fe5f
GIRAPH-505: Metrics Updates (nitay) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/8ed06fe5 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/8ed06fe5 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/8ed06fe5 Branch: refs/heads/trunk Commit: 8ed06fe5f736d135ca4a7fb208a20f5844c15781 Parents: 02fb21d Author: Nitay Joffe <[email protected]> Authored: Wed Feb 6 14:30:03 2013 -0500 Committer: Nitay Joffe <[email protected]> Committed: Wed Feb 6 19:13:43 2013 -0500 ---------------------------------------------------------------------- CHANGELOG | 2 + .../org/apache/giraph/comm/netty/ByteCounter.java | 16 ++-- .../giraph/comm/netty/NettyWorkerClient.java | 48 ++++++++- .../netty/NettyWorkerClientRequestProcessor.java | 37 ++++++- .../requests/SendPartitionMutationsRequest.java | 6 + .../org/apache/giraph/graph/ComputeCallable.java | 47 +++++---- .../org/apache/giraph/graph/GraphTaskManager.java | 6 +- .../org/apache/giraph/master/BspServiceMaster.java | 2 +- .../apache/giraph/metrics/AggregatedMetrics.java | 10 -- .../org/apache/giraph/metrics/GiraphMetrics.java | 7 -- .../giraph/metrics/GiraphMetricsRegistry.java | 51 +++++++--- .../java/org/apache/giraph/metrics/MeterDesc.java | 78 ++++++++++++++ .../org/apache/giraph/metrics/MetricNames.java | 81 +++++++++++++++ .../giraph/metrics/SuperstepMetricsRegistry.java | 15 ++-- .../java/org/apache/giraph/metrics/TimerDesc.java | 72 +++++++++++++ .../giraph/metrics/WorkerSuperstepMetrics.java | 6 +- .../java/org/apache/giraph/utils/MemoryUtils.java | 11 +- .../org/apache/giraph/worker/BspServiceWorker.java | 2 +- .../giraph/worker/EdgeInputSplitsCallable.java | 28 +++--- .../apache/giraph/worker/InputSplitsCallable.java | 26 ++++- .../giraph/worker/VertexInputSplitsCallable.java | 63 +++++------ 21 files changed, 471 insertions(+), 143 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 05bcf91..c6e2b75 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-505: Metrics Updates (nitay) + GIRAPH-506: Concurrency issue - response can arrive before request is added to the outstanding map (majakabiljo) GIRAPH-501: WorkerObserver (nitay) http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/comm/netty/ByteCounter.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/ByteCounter.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/ByteCounter.java index 825e383..6bba199 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/ByteCounter.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/ByteCounter.java @@ -19,6 +19,8 @@ package org.apache.giraph.comm.netty; import org.apache.giraph.metrics.GiraphMetrics; +import org.apache.giraph.metrics.MeterDesc; +import org.apache.giraph.metrics.MetricNames; import org.apache.giraph.metrics.ResetSuperstepMetricsObserver; import org.apache.giraph.metrics.SuperstepMetricsRegistry; import org.apache.giraph.time.SystemTime; @@ -36,7 +38,6 @@ import com.yammer.metrics.core.NoOpHistogram; import com.yammer.metrics.core.NoOpMeter; import java.text.DecimalFormat; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** @@ -86,12 +87,13 @@ public class ByteCounter extends SimpleChannelHandler implements @Override public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) { - sentRequestsMeter = superstepMetrics.getMeter("sent-requests", "requests", - TimeUnit.SECONDS); - sentBytesHist = superstepMetrics.getHistogram("sent-bytes", false); - receivedRequestsMeter = superstepMetrics.getMeter("received-requests", - "request", TimeUnit.SECONDS); - receivedBytesHist = superstepMetrics.getHistogram("received-bytes", false); + sentRequestsMeter = superstepMetrics.getMeter(MeterDesc.SENT_REQUESTS); + sentBytesHist = superstepMetrics.getUniformHistogram( + MetricNames.SENT_BYTES); + receivedRequestsMeter = superstepMetrics.getMeter( + MeterDesc.RECEIVED_REQUESTS); + receivedBytesHist = superstepMetrics.getUniformHistogram( + MetricNames.RECEIVED_BYTES); } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java index 9e6ed66..9c09524 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java @@ -18,22 +18,30 @@ package org.apache.giraph.comm.netty; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.comm.WorkerClient; +import org.apache.giraph.comm.requests.RequestType; import org.apache.giraph.comm.requests.WritableRequest; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.TaskInfo; -import org.apache.giraph.worker.WorkerInfo; +import org.apache.giraph.metrics.GiraphMetrics; +import org.apache.giraph.metrics.MetricNames; +import org.apache.giraph.metrics.ResetSuperstepMetricsObserver; +import org.apache.giraph.metrics.SuperstepMetricsRegistry; import org.apache.giraph.partition.PartitionOwner; +import org.apache.giraph.worker.WorkerInfo; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; import org.apache.log4j.Logger; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.yammer.metrics.core.Counter; import java.io.IOException; import java.util.List; +import java.util.Map; /** * Takes users facing APIs in {@link WorkerClient} and implements them @@ -47,7 +55,7 @@ import java.util.List; @SuppressWarnings("rawtypes") public class NettyWorkerClient<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> implements - WorkerClient<I, V, E, M> { + WorkerClient<I, V, E, M>, ResetSuperstepMetricsObserver { /** Class logger */ private static final Logger LOG = Logger.getLogger(NettyWorkerClient.class); /** Hadoop configuration */ @@ -57,6 +65,10 @@ public class NettyWorkerClient<I extends WritableComparable, /** Centralized service, needed to get vertex ranges */ private final CentralizedServiceWorker<I, V, E, M> service; + // Metrics + /** Per-superstep, per-request counters */ + private final Map<RequestType, Counter> superstepRequestCounters; + /** * Only constructor. * @@ -72,6 +84,31 @@ public class NettyWorkerClient<I extends WritableComparable, new NettyClient(context, configuration, service.getWorkerInfo()); this.conf = configuration; this.service = service; + this.superstepRequestCounters = Maps.newHashMap(); + GiraphMetrics.get().addSuperstepResetObserver(this); + } + + @Override + public void newSuperstep(SuperstepMetricsRegistry metrics) { + superstepRequestCounters.clear(); + superstepRequestCounters.put(RequestType.SEND_VERTEX_REQUEST, + metrics.getCounter(MetricNames.SEND_VERTEX_REQUESTS)); + superstepRequestCounters.put(RequestType.SEND_WORKER_MESSAGES_REQUEST, + metrics.getCounter(MetricNames.SEND_WORKER_MESSAGES_REQUESTS)); + superstepRequestCounters.put( + RequestType.SEND_PARTITION_CURRENT_MESSAGES_REQUEST, + metrics.getCounter( + MetricNames.SEND_PARTITION_CURRENT_MESSAGES_REQUESTS)); + superstepRequestCounters.put(RequestType.SEND_PARTITION_MUTATIONS_REQUEST, + metrics.getCounter(MetricNames.SEND_PARTITION_MUTATIONS_REQUESTS)); + superstepRequestCounters.put(RequestType.SEND_WORKER_AGGREGATORS_REQUEST, + metrics.getCounter(MetricNames.SEND_WORKER_AGGREGATORS_REQUESTS)); + superstepRequestCounters.put(RequestType.SEND_AGGREGATORS_TO_MASTER_REQUEST, + metrics.getCounter(MetricNames.SEND_AGGREGATORS_TO_MASTER_REQUESTS)); + superstepRequestCounters.put(RequestType.SEND_AGGREGATORS_TO_OWNER_REQUEST, + metrics.getCounter(MetricNames.SEND_AGGREGATORS_TO_OWNER_REQUESTS)); + superstepRequestCounters.put(RequestType.SEND_AGGREGATORS_TO_WORKER_REQUEST, + metrics.getCounter(MetricNames.SEND_AGGREGATORS_TO_WORKER_REQUESTS)); } public CentralizedServiceWorker<I, V, E, M> getService() { @@ -100,6 +137,10 @@ public class NettyWorkerClient<I extends WritableComparable, @Override public void sendWritableRequest(Integer destTaskId, WritableRequest request) { + Counter counter = superstepRequestCounters.get(request.getType()); + if (counter != null) { + counter.inc(); + } nettyClient.sendWritableRequest(destTaskId, request); } @@ -134,5 +175,6 @@ else[HADOOP_NON_SECURE]*/ public void authenticate() { nettyClient.authenticate(); } + /*end[HADOOP_NON_SECURE]*/ } http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java index 2a4ee8d..d4e919e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java @@ -37,7 +37,8 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.Edge; import org.apache.giraph.graph.VertexMutations; import org.apache.giraph.metrics.GiraphMetrics; -import org.apache.giraph.metrics.ValueGauge; +import org.apache.giraph.metrics.MetricNames; +import org.apache.giraph.metrics.SuperstepMetricsRegistry; import org.apache.giraph.partition.Partition; import org.apache.giraph.partition.PartitionOwner; import org.apache.giraph.utils.ByteArrayVertexIdMessages; @@ -49,6 +50,10 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; import org.apache.log4j.Logger; +import com.yammer.metrics.core.Counter; +import com.yammer.metrics.core.Gauge; +import com.yammer.metrics.util.PercentGauge; + import java.io.IOException; import java.util.Map; @@ -91,8 +96,10 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable, private final ServerData<I, V, E, M> serverData; // Per-Superstep Metrics - /** messages sent in a superstep */ - private final ValueGauge<Long> msgsSentInSuperstep; + /** Number of requests that went on the wire */ + private final Counter localRequests; + /** Number of requests that were handled locally */ + private final Counter remoteRequests; /** * Constructor. @@ -123,8 +130,25 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable, // Per-Superstep Metrics. // Since this object is not long lived we just initialize the metrics here. - GiraphMetrics gmr = GiraphMetrics.get(); - msgsSentInSuperstep = new ValueGauge<Long>(gmr.perSuperstep(), "msgs-sent"); + SuperstepMetricsRegistry smr = GiraphMetrics.get().perSuperstep(); + localRequests = smr.getCounter(MetricNames.LOCAL_REQUESTS); + remoteRequests = smr.getCounter(MetricNames.REMOTE_REQUESTS); + final Gauge<Long> totalRequests = smr.getGauge(MetricNames.TOTAL_REQUESTS, + new Gauge<Long>() { + @Override public Long value() { + return localRequests.count() + remoteRequests.count(); + } + } + ); + smr.getGauge(MetricNames.PERCENT_LOCAL_REQUESTS, new PercentGauge() { + @Override protected double getNumerator() { + return localRequests.count(); + } + + @Override protected double getDenominator() { + return totalRequests.value(); + } + }); } @Override @@ -374,7 +398,6 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable, @Override public long resetMessageCount() { - msgsSentInSuperstep.set(totalMsgsSentInSuperstep); long messagesSentInSuperstep = totalMsgsSentInSuperstep; totalMsgsSentInSuperstep = 0; return messagesSentInSuperstep; @@ -397,9 +420,11 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable, if (serviceWorker.getWorkerInfo().getTaskId() == workerInfo.getTaskId()) { ((WorkerRequest) writableRequest).doRequest(serverData); + localRequests.inc(); } else { workerClient.sendWritableRequest( workerInfo.getTaskId(), writableRequest); + remoteRequests.inc(); } } } http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java index 22e4944..a96842d 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionMutationsRequest.java @@ -20,11 +20,14 @@ package org.apache.giraph.comm.requests; import org.apache.giraph.comm.ServerData; import org.apache.giraph.graph.VertexMutations; +import org.apache.giraph.metrics.GiraphMetrics; +import org.apache.giraph.metrics.MetricNames; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.log4j.Logger; import com.google.common.collect.Maps; +import com.yammer.metrics.core.Histogram; import java.io.DataInput; import java.io.DataOutput; @@ -110,6 +113,9 @@ public class SendPartitionMutationsRequest<I extends WritableComparable, public void doRequest(ServerData<I, V, E, M> serverData) { ConcurrentHashMap<I, VertexMutations<I, V, E, M>> vertexMutations = serverData.getVertexMutations(); + Histogram verticesInMutationHist = GiraphMetrics.get().perSuperstep() + .getUniformHistogram(MetricNames.VERTICES_IN_MUTATION_REQUEST); + verticesInMutationHist.update(vertexMutations.size()); for (Entry<I, VertexMutations<I, V, E, M>> entry : vertexIdMutations.entrySet()) { VertexMutations<I, V, E, M> mutations = http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java index a87561d..94ed6d9 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java @@ -17,29 +17,22 @@ */ package org.apache.giraph.graph; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.yammer.metrics.core.Timer; -import com.yammer.metrics.core.TimerContext; -import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.comm.WorkerClientRequestProcessor; import org.apache.giraph.comm.messages.MessageStoreByPartition; import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.metrics.GiraphMetrics; +import org.apache.giraph.metrics.MetricNames; +import org.apache.giraph.metrics.SuperstepMetricsRegistry; +import org.apache.giraph.metrics.TimerDesc; import org.apache.giraph.partition.Partition; import org.apache.giraph.partition.PartitionStats; -import org.apache.giraph.metrics.GiraphMetrics; - -import org.apache.giraph.utils.MemoryUtils; import org.apache.giraph.time.SystemTime; import org.apache.giraph.time.Time; -import org.apache.giraph.utils.TimedLogger; import org.apache.giraph.time.Times; +import org.apache.giraph.utils.MemoryUtils; +import org.apache.giraph.utils.TimedLogger; import org.apache.giraph.vertex.Vertex; import org.apache.giraph.worker.WorkerThreadAggregatorUsage; import org.apache.hadoop.io.Writable; @@ -47,6 +40,18 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; import org.apache.log4j.Logger; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.yammer.metrics.core.Counter; +import com.yammer.metrics.core.Timer; +import com.yammer.metrics.core.TimerContext; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; + /** * Compute as many vertex partitions as possible. Every thread will has its * own instance of WorkerClientRequestProcessor to send requests. Note that @@ -62,8 +67,6 @@ import org.apache.log4j.Logger; */ public class ComputeCallable<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> implements Callable { - /** Name of timer for compute call */ - public static final String TIMER_COMPUTE_ONE = "compute-one"; /** Class logger */ private static final Logger LOG = Logger.getLogger(ComputeCallable.class); /** Class time object */ @@ -89,6 +92,8 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable, private final long startNanos = TIME.getNanoseconds(); // Per-Superstep Metrics + /** Messages sent */ + private final Counter messagesSentCounter; /** Timer for single compute() call */ private final Timer computeOneTimer; @@ -116,10 +121,11 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable, // Will be replaced later in call() for locality this.graphState = graphState; - GiraphMetrics metrics = GiraphMetrics.get(); + SuperstepMetricsRegistry metrics = GiraphMetrics.get().perSuperstep(); // Normally we would use ResetSuperstepMetricsObserver but this class is // not long-lived, so just instantiating in the constructor is good enough. - computeOneTimer = metrics.perSuperstep().getTimer(TIMER_COMPUTE_ONE); + computeOneTimer = metrics.getTimer(TimerDesc.COMPUTE_ONE); + messagesSentCounter = metrics.getCounter(MetricNames.MESSAGES_SENT); } @Override @@ -148,8 +154,9 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable, try { PartitionStats partitionStats = computePartition(partition); partitionStatsList.add(partitionStats); - partitionStats.addMessagesSentCount( - workerClientRequestProcessor.resetMessageCount()); + long partitionMsgs = workerClientRequestProcessor.resetMessageCount(); + partitionStats.addMessagesSentCount(partitionMsgs); + messagesSentCounter.inc(partitionMsgs); timedLogger.info("call: Completed " + partitionStatsList.size() + " partitions, " + partitionIdQueue.size() + " remaining " + http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java index f7fb7e9..3624728 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java @@ -312,7 +312,6 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, * Utility to encapsulate Giraph metrics setup calls */ private void setupAndInitializeGiraphMetrics() { - // Set up GiraphMetrics GiraphMetrics.init(conf); GiraphMetrics.get().addSuperstepResetObserver(this); initJobMetrics(); @@ -368,7 +367,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, serviceWorker.finishSuperstep(graphState, partitionStatsList); superstepTimerContext.stop(); if (conf.metricsEnabled()) { - GiraphMetrics.get().perSuperstep().printSummary(); + GiraphMetrics.get().perSuperstep().printSummary(System.err); } return finishedSuperstepStats; } @@ -802,8 +801,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, if (done) { return true; } - GiraphMetrics.get(). - resetSuperstepMetrics(BspService.INPUT_SUPERSTEP); + GiraphMetrics.get().resetSuperstepMetrics(BspService.INPUT_SUPERSTEP); if (graphFunctions.isNotAWorker()) { if (LOG.isInfoEnabled()) { LOG.info("map: No need to do anything when not a worker"); http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java index 677ab82..1f4a184 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -923,7 +923,7 @@ public class BspServiceMaster<I extends WritableComparable, } if (conf.metricsEnabled()) { - aggregatedMetrics.print(superstep); + aggregatedMetrics.print(superstep, System.err); } if (LOG.isInfoEnabled()) { http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetrics.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetrics.java b/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetrics.java index 5f66823..9ce074b 100644 --- a/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetrics.java +++ b/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetrics.java @@ -105,16 +105,6 @@ public class AggregatedMetrics { } /** - * Print the aggregated metrics to the stream provided. - * - * @param superstep long number of superstep. - * @return this - */ - public AggregatedMetrics print(long superstep) { - return print(superstep, System.out); - } - - /** * Print batch of lines for AggregatedMetric * * @param out PrintStream to write to http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphMetrics.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphMetrics.java b/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphMetrics.java index 43e40d9..d104ec1 100644 --- a/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphMetrics.java +++ b/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphMetrics.java @@ -131,11 +131,4 @@ public class GiraphMetrics { perJob.printToStream(out); perSuperstep.printToStream(out); } - - /** - * Dump all metrics to stdout. - */ - public void dumpToStdout() { - dumpToStream(System.out); - } } http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphMetricsRegistry.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphMetricsRegistry.java b/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphMetricsRegistry.java index 184c97e..355e510 100644 --- a/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphMetricsRegistry.java +++ b/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphMetricsRegistry.java @@ -168,14 +168,25 @@ public class GiraphMetricsRegistry { } /** - * Creates a new non-biased {@link com.yammer.metrics.core.Histogram} and - * registers it under the given group and name. + * Creates a new biased {@link Histogram} and registers it under the given + * group and name * - * @param name the name of the metric - * @return a new {@link com.yammer.metrics.core.Histogram} + * @param name name of metric + * @return new {@link Histogram} */ - public Histogram getHistogram(String name) { - return registry.newHistogram(makeMetricName(name), false); + public Histogram getBiasedHistogram(String name) { + return getHistogram(name, true); + } + + /** + * Creates a new uniform {@link Histogram} and registers it under the given + * group and name + * + * @param name name of metric + * @return new {@link Histogram} + */ + public Histogram getUniformHistogram(String name) { + return getHistogram(name, false); } /** @@ -186,7 +197,7 @@ public class GiraphMetricsRegistry { * @param biased whether or not the histogram should be biased * @return a new {@link Histogram} */ - public Histogram getHistogram(String name, boolean biased) { + private Histogram getHistogram(String name, boolean biased) { return registry.newHistogram(makeMetricName(name), biased); } @@ -194,6 +205,18 @@ public class GiraphMetricsRegistry { * Creates a new {@link com.yammer.metrics.core.Meter} and registers it under * the given group and name. * + * @param meterDesc description of meter + * @return new {@link com.yammer.metrics.core.Meter} + */ + public Meter getMeter(MeterDesc meterDesc) { + return getMeter(meterDesc.getName(), meterDesc.getType(), + meterDesc.getTimeUnit()); + } + + /** + * Creates a new {@link com.yammer.metrics.core.Meter} and registers it under + * the given group and name. + * * @param name the name of the metric * @param eventType the plural name of the type of events the meter is * measuring (e.g., {@code "requests"}) @@ -205,15 +228,15 @@ public class GiraphMetricsRegistry { } /** - * Creates a new {@link com.yammer.metrics.core.Timer} and registers it under - * the given group and name, measuring elapsed time in milliseconds and - * invocations per second. + * Create a new {@link Timer} from the description and registers it under the + * given group and name. * - * @param name the name of the metric - * @return a new {@link com.yammer.metrics.core.Timer} + * @param timerDesc TimerDesc describing the timer + * @return new {@link Timer} */ - public Timer getTimer(String name) { - return getTimer(name, TimeUnit.MILLISECONDS, TimeUnit.SECONDS); + public Timer getTimer(TimerDesc timerDesc) { + return getTimer(timerDesc.getName(), timerDesc.getDurationUnit(), + timerDesc.getTimeUnit()); } /** http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/metrics/MeterDesc.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/MeterDesc.java b/giraph-core/src/main/java/org/apache/giraph/metrics/MeterDesc.java new file mode 100644 index 0000000..88bd542 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/metrics/MeterDesc.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.metrics; + +import java.util.concurrent.TimeUnit; + +/** + * Description for Meters used in Giraph. + */ +public enum MeterDesc { + /** Number of requests received */ + RECEIVED_REQUESTS("requests-received", "requests", TimeUnit.SECONDS), + /** Number of requests sent */ + SENT_REQUESTS("requests-sent", "requests", TimeUnit.SECONDS), + /** Total edges loaded */ + EDGES_LOADED("edges-loaded", "edges", TimeUnit.SECONDS), + /** Total vertices loaded */ + VERTICES_LOADED("vertices-loaded", "vertices", TimeUnit.SECONDS); + + /** Name of meter */ + private final String name; + /** Type this meter tracks */ + private final String type; + /** TimeUnit this meter tracks in */ + private final TimeUnit timeUnit; + + /** + * Constructor + * @param name String name of meter + * @param type String type of meter + * @param timeUnit TimeUnit meter tracks + */ + private MeterDesc(String name, String type, TimeUnit timeUnit) { + this.name = name; + this.type = type; + this.timeUnit = timeUnit; + } + + /** + * Get name of meter + * @return String name + */ + public String getName() { + return name; + } + + /** + * Get TimeUnit of meter + * @return TimeUnit + */ + public TimeUnit getTimeUnit() { + return timeUnit; + } + + /** + * Get type of meter + * @return String type + */ + public String getType() { + return type; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/metrics/MetricNames.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/MetricNames.java b/giraph-core/src/main/java/org/apache/giraph/metrics/MetricNames.java new file mode 100644 index 0000000..52a3d15 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/metrics/MetricNames.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.metrics; + +/** + * Class holding metric names used in Giraph + */ +// CHECKSTYLE: stop InterfaceIsTypeCheck +public interface MetricNames { +// CHECKSTYLE: resume InterfaceIsTypeCheck + + ////////////////////////////////////////////////////////////////////////////// + // Request counters per superstep + ////////////////////////////////////////////////////////////////////////////// + /** Counter of requests handled locally */ + String LOCAL_REQUESTS = "local-requests"; + /** Counter of requets sent on the wire */ + String REMOTE_REQUESTS = "remote-requests"; + /** Guage of total requests */ + String TOTAL_REQUESTS = "total-requests"; + /** PercentGauge of requests that are handled locally */ + String PERCENT_LOCAL_REQUESTS = "percent-local-requests"; + + /** Counter for sending vertices requests */ + String SEND_VERTEX_REQUESTS = "send-vertex-requests"; + /** Counter for sending a partition of messages for next superstep */ + String SEND_WORKER_MESSAGES_REQUESTS = "send-worker-messages-requests"; + /** + * Counter for sending a partition of messages for current superstep + * (used during partition exchange) + */ + String SEND_PARTITION_CURRENT_MESSAGES_REQUESTS = + "send-partition-current-messages-requests"; + /** Counter for sending a partition of mutations */ + String SEND_PARTITION_MUTATIONS_REQUESTS = + "send-partition-mutations-requests"; + /** Counter for sending aggregated values from one worker's vertices */ + String SEND_WORKER_AGGREGATORS_REQUESTS = "send-worker-aggregators-requests"; + /** Counter for sending aggregated values from worker owner to master */ + String SEND_AGGREGATORS_TO_MASTER_REQUESTS = + "send-aggregators-to-master-requests"; + /** Counter for sending aggregators from master to worker owners */ + String SEND_AGGREGATORS_TO_OWNER_REQUESTS = + "send-aggregators-to-owner-requests"; + /** Counter for sending aggregators from worker owner to other workers */ + String SEND_AGGREGATORS_TO_WORKER_REQUESTS = + "send-aggregators-to-worker-requests"; + ////////////////////////////////////////////////////////////////////////////// + // End of Request counters per superstep + ////////////////////////////////////////////////////////////////////////////// + + /** Counter of messages sent in superstep */ + String MESSAGES_SENT = "messages-sent"; + + /** Histogram for vertices in mutations requests */ + String VERTICES_IN_MUTATION_REQUEST = "vertices-per-mutations-request"; + + /** Number of bytes sent in superstep */ + String SENT_BYTES = "sent-bytes"; + /** Number of bytes received in superstep */ + String RECEIVED_BYTES = "received-bytes"; + + /** PercentGauge of memory free */ + String MEMORY_FREE_PERCENT = "memory-free-pct"; +} http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java b/giraph-core/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java index 71aad31..57b858e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java +++ b/giraph-core/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java @@ -49,6 +49,14 @@ public class SuperstepMetricsRegistry extends GiraphMetricsRegistry { } /** + * Get superstep stored here + * @return long superstep + */ + public long getSuperstep() { + return superstep; + } + + /** * Set superstep number used. Internally sets the group for metrics created. * * @param superstep long number of superstep to use. @@ -66,11 +74,4 @@ public class SuperstepMetricsRegistry extends GiraphMetricsRegistry { public void printSummary(PrintStream out) { new WorkerSuperstepMetrics().readFromRegistry().print(superstep, out); } - - /** - * Print human readable summary of superstep metrics. - */ - public void printSummary() { - printSummary(System.out); - } } http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/metrics/TimerDesc.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/TimerDesc.java b/giraph-core/src/main/java/org/apache/giraph/metrics/TimerDesc.java new file mode 100644 index 0000000..1e7f3a9 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/metrics/TimerDesc.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.metrics; + +import java.util.concurrent.TimeUnit; + +/** + * Description for Timers used in Giraph + */ +public enum TimerDesc { + + /** Timer around Vertex#compute() */ + COMPUTE_ONE("compute-one", TimeUnit.MILLISECONDS, TimeUnit.SECONDS); + + /** Name of timer */ + private final String name; + /** Duration unit for timer */ + private final TimeUnit durationUnit; + /** Time unit for timer */ + private final TimeUnit timeUnit; + + /** + * Constructor + * @param name String name of timer + * @param durationUnit Duration unit of timer + * @param timeUnit Time unit of timer + */ + private TimerDesc(String name, TimeUnit durationUnit, TimeUnit timeUnit) { + this.name = name; + this.durationUnit = durationUnit; + this.timeUnit = timeUnit; + } + + /** + * Get duration unit of timer + * @return TimeUnit + */ + public TimeUnit getDurationUnit() { + return durationUnit; + } + + /** + * Get name of timer + * @return String name + */ + public String getName() { + return name; + } + + /** + * Get time unit of timer + * @return TimeUnit of timer + */ + public TimeUnit getTimeUnit() { + return timeUnit; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java b/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java index 90567e4..e51f96e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java +++ b/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java @@ -18,9 +18,8 @@ package org.apache.giraph.metrics; -import org.apache.giraph.worker.BspServiceWorker; -import org.apache.giraph.graph.ComputeCallable; import org.apache.giraph.graph.GraphTaskManager; +import org.apache.giraph.worker.BspServiceWorker; import org.apache.hadoop.io.Writable; import com.yammer.metrics.core.Gauge; @@ -75,8 +74,7 @@ public class WorkerSuperstepMetrics implements Writable { readGiraphTimer(GraphTaskManager.TIMER_TIME_TO_FIRST_MSG, timeToFirstMsg); readGiraphTimer(GraphTaskManager.TIMER_SUPERSTEP_TIME, superstepTimer); readGiraphTimer(BspServiceWorker.TIMER_WAIT_REQUESTS, waitRequestsTimer); - userComputeTime.setValue((long) ssm.getTimer( - ComputeCallable.TIMER_COMPUTE_ONE).sum()); + userComputeTime.setValue((long) ssm.getTimer(TimerDesc.COMPUTE_ONE).sum()); return this; } http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/utils/MemoryUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/MemoryUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/MemoryUtils.java index eec8388..b5ebb10 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/MemoryUtils.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/MemoryUtils.java @@ -18,9 +18,11 @@ package org.apache.giraph.utils; -import com.yammer.metrics.util.PercentGauge; import org.apache.giraph.metrics.GiraphMetrics; import org.apache.giraph.metrics.GiraphMetricsRegistry; +import org.apache.giraph.metrics.MetricNames; + +import com.yammer.metrics.util.PercentGauge; /** * Helper static methods for tracking memory usage. @@ -67,7 +69,7 @@ public class MemoryUtils { */ public static void initMetrics() { GiraphMetricsRegistry metrics = GiraphMetrics.get().perJob(); - metrics.getGauge("memory-free-pct", new PercentGauge() { + metrics.getGauge(MetricNames.MEMORY_FREE_PERCENT, new PercentGauge() { @Override protected double getNumerator() { return freeMemoryMB(); @@ -87,8 +89,7 @@ public class MemoryUtils { * @return String of all Runtime stats. */ public static String getRuntimeMemoryStats() { - return "totalMem = " + totalMemoryMB() + - "M, maxMem = " + maxMemoryMB() + - "M, freeMem = " + freeMemoryMB() + "M"; + return String.format("Memory (free/total/max) = %.2fM / %.2fM / %.2fM", + freeMemoryMB(), totalMemoryMB(), maxMemoryMB()); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index e48e01a..fa3ab49 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -1015,7 +1015,7 @@ else[HADOOP_NON_SECURE]*/ } if (getConfiguration().metricsEnabled()) { - GiraphMetrics.get().dumpToStdout(); + GiraphMetrics.get().dumpToStream(System.err); } // Preferably would shut down the service only after http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java index 3e2dc66..bdf9f57 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java @@ -24,8 +24,6 @@ import org.apache.giraph.graph.GraphState; import org.apache.giraph.graph.VertexEdgeCount; import org.apache.giraph.io.EdgeInputFormat; import org.apache.giraph.io.EdgeReader; -import org.apache.giraph.metrics.GiraphMetrics; -import org.apache.giraph.metrics.GiraphMetricsRegistry; import org.apache.giraph.utils.LoggerUtils; import org.apache.giraph.utils.MemoryUtils; import org.apache.giraph.zk.ZooKeeperExt; @@ -36,7 +34,7 @@ import org.apache.hadoop.mapreduce.Mapper; import org.apache.log4j.Level; import org.apache.log4j.Logger; -import com.yammer.metrics.core.Counter; +import com.yammer.metrics.core.Meter; import java.io.IOException; @@ -53,17 +51,18 @@ import java.io.IOException; public class EdgeInputSplitsCallable<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> extends InputSplitsCallable<I, V, E, M> { + /** How often to update metrics and print info */ + public static final int VERTICES_UPDATE_PERIOD = 1000000; + /** Class logger */ private static final Logger LOG = Logger.getLogger( EdgeInputSplitsCallable.class); - /** Total edges loaded */ - private long totalEdgesLoaded = 0; /** Input split max edges (-1 denotes all) */ private final long inputSplitMaxEdges; // Metrics - /** number of edges loaded counter */ - private final Counter edgesLoadedCounter; + /** edges loaded meter across all readers */ + private final Meter totalEdgesMeter; /** * Constructor. @@ -88,8 +87,7 @@ public class EdgeInputSplitsCallable<I extends WritableComparable, inputSplitMaxEdges = configuration.getInputSplitMaxEdges(); // Initialize Metrics - GiraphMetricsRegistry jobMetrics = GiraphMetrics.get().perJob(); - edgesLoadedCounter = jobMetrics.getCounter(COUNTER_EDGES_LOADED); + totalEdgesMeter = getTotalEdgesLoadedMeter(); } /** @@ -137,11 +135,13 @@ public class EdgeInputSplitsCallable<I extends WritableComparable, context.progress(); // do this before potential data transfer ++inputSplitEdgesLoaded; - // Update status every 1M edges - if (((inputSplitEdgesLoaded + totalEdgesLoaded) % 1000000) == 0) { + // Update status every VERTICES_UPDATE_PERIOD edges + if (inputSplitEdgesLoaded % VERTICES_UPDATE_PERIOD == 0) { + totalEdgesMeter.mark(VERTICES_UPDATE_PERIOD); LoggerUtils.setStatusAndLog(context, LOG, Level.INFO, - "readInputSplit: Loaded " + - (inputSplitEdgesLoaded + totalEdgesLoaded) + " edges " + + "readEdgeInputSplit: Loaded " + + totalEdgesMeter.count() + " edges at " + + totalEdgesMeter.meanRate() + " edges/sec " + MemoryUtils.getRuntimeMemoryStats()); } @@ -158,8 +158,6 @@ public class EdgeInputSplitsCallable<I extends WritableComparable, } } edgeReader.close(); - totalEdgesLoaded += inputSplitEdgesLoaded; - edgesLoadedCounter.inc(inputSplitEdgesLoaded); return new VertexEdgeCount(0, inputSplitEdgesLoaded); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java index 5487ab7..0ec20fd 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java @@ -23,6 +23,8 @@ import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.GraphState; import org.apache.giraph.graph.VertexEdgeCount; +import org.apache.giraph.metrics.GiraphMetrics; +import org.apache.giraph.metrics.MeterDesc; import org.apache.giraph.time.SystemTime; import org.apache.giraph.time.Time; import org.apache.giraph.time.Times; @@ -36,6 +38,8 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.log4j.Logger; import org.apache.zookeeper.KeeperException; +import com.yammer.metrics.core.Meter; + import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; @@ -54,10 +58,6 @@ import java.util.concurrent.Callable; public abstract class InputSplitsCallable<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> implements Callable<VertexEdgeCount> { - /** Name of counter for vertices loaded */ - public static final String COUNTER_VERTICES_LOADED = "vertices-loaded"; - /** Name of counter for edges loaded */ - public static final String COUNTER_EDGES_LOADED = "edges-loaded"; /** Class logger */ private static final Logger LOG = Logger.getLogger(InputSplitsCallable.class); /** Class time object */ @@ -118,6 +118,24 @@ public abstract class InputSplitsCallable<I extends WritableComparable, // CHECKSTYLE: resume ParameterNumberCheck /** + * Get Meter tracking edges loaded + * + * @return Meter tracking edges loaded + */ + public static Meter getTotalEdgesLoadedMeter() { + return GiraphMetrics.get().perJob().getMeter(MeterDesc.EDGES_LOADED); + } + + /** + * Get Meter tracking number of vertices loaded. + * + * @return Meter for vertices loaded + */ + public static Meter getTotalVerticesLoadedMeter() { + return GiraphMetrics.get().perJob().getMeter(MeterDesc.VERTICES_LOADED); + } + + /** * Load vertices/edges from the given input split. * * @param inputSplit Input split to load http://git-wip-us.apache.org/repos/asf/giraph/blob/8ed06fe5/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java index a192aeb..de1ae59 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java @@ -18,15 +18,11 @@ package org.apache.giraph.worker; -import com.yammer.metrics.core.Counter; -import java.io.IOException; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.GraphState; import org.apache.giraph.graph.VertexEdgeCount; import org.apache.giraph.io.VertexInputFormat; import org.apache.giraph.io.VertexReader; -import org.apache.giraph.metrics.GiraphMetrics; -import org.apache.giraph.metrics.GiraphMetricsRegistry; import org.apache.giraph.partition.PartitionOwner; import org.apache.giraph.utils.LoggerUtils; import org.apache.giraph.utils.MemoryUtils; @@ -39,6 +35,10 @@ import org.apache.hadoop.mapreduce.Mapper; import org.apache.log4j.Level; import org.apache.log4j.Logger; +import com.yammer.metrics.core.Meter; + +import java.io.IOException; + /** * Load as many vertex input splits as possible. * Every thread will has its own instance of WorkerClientRequestProcessor @@ -52,23 +52,21 @@ import org.apache.log4j.Logger; public class VertexInputSplitsCallable<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> extends InputSplitsCallable<I, V, E, M> { + /** How often to update metrics and print info */ + public static final int VERTICES_UPDATE_PERIOD = 250000; /** Class logger */ private static final Logger LOG = Logger.getLogger(VertexInputSplitsCallable.class); - /** Total vertices loaded */ - private long totalVerticesLoaded = 0; - /** Total edges loaded */ - private long totalEdgesLoaded = 0; /** Input split max vertices (-1 denotes all) */ private final long inputSplitMaxVertices; /** Bsp service worker (only use thread-safe methods) */ private final BspServiceWorker<I, V, E, M> bspServiceWorker; // Metrics - /** number of vertices loaded counter */ - private final Counter verticesLoadedCounter; - /** number of edges loaded counter */ - private final Counter edgesLoadedCounter; + /** number of vertices loaded meter across all readers */ + private final Meter totalVerticesMeter; + /** number of edges loaded meter across all readers */ + private final Meter totalEdgesMeter; /** * Constructor. @@ -94,9 +92,8 @@ public class VertexInputSplitsCallable<I extends WritableComparable, this.bspServiceWorker = bspServiceWorker; // Initialize Metrics - GiraphMetricsRegistry jobMetrics = GiraphMetrics.get().perJob(); - verticesLoadedCounter = jobMetrics.getCounter(COUNTER_VERTICES_LOADED); - edgesLoadedCounter = jobMetrics.getCounter(COUNTER_EDGES_LOADED); + totalVerticesMeter = getTotalVerticesLoadedMeter(); + totalEdgesMeter = getTotalEdgesLoadedMeter(); } /** @@ -120,9 +117,8 @@ public class VertexInputSplitsCallable<I extends WritableComparable, vertexInputFormat.createVertexReader(inputSplit, context); vertexReader.initialize(inputSplit, context); long inputSplitVerticesLoaded = 0; + long edgesSinceLastUpdate = 0; long inputSplitEdgesLoaded = 0; - long nextPrintVertices = 0; - long nextPrintMsecs = System.currentTimeMillis() + 15000; while (vertexReader.nextVertex()) { Vertex<I, V, E, M> readerVertex = vertexReader.getCurrentVertex(); @@ -143,22 +139,23 @@ public class VertexInputSplitsCallable<I extends WritableComparable, partitionOwner, readerVertex); context.progress(); // do this before potential data transfer ++inputSplitVerticesLoaded; - inputSplitEdgesLoaded += readerVertex.getNumEdges(); + edgesSinceLastUpdate += readerVertex.getNumEdges(); + + // Update status every VERTICES_UPDATE_PERIOD vertices + if (inputSplitVerticesLoaded % VERTICES_UPDATE_PERIOD == 0) { + totalVerticesMeter.mark(VERTICES_UPDATE_PERIOD); + totalEdgesMeter.mark(edgesSinceLastUpdate); + inputSplitEdgesLoaded += edgesSinceLastUpdate; + edgesSinceLastUpdate = 0; - // Update status at most every 250k vertices or 15 seconds - if ((inputSplitVerticesLoaded + totalVerticesLoaded) > - nextPrintVertices && - System.currentTimeMillis() > nextPrintMsecs) { LoggerUtils.setStatusAndLog( context, LOG, Level.INFO, - "readInputSplit: Loaded " + - (inputSplitVerticesLoaded + totalVerticesLoaded) + - " vertices " + - (inputSplitEdgesLoaded + totalEdgesLoaded) + " edges " + + "readVertexInputSplit: Loaded " + + totalVerticesMeter.count() + " vertices at " + + totalVerticesMeter.meanRate() + " vertices/sec " + + totalEdgesMeter.count() + " edges at " + + totalEdgesMeter.meanRate() + " edges/sec " + MemoryUtils.getRuntimeMemoryStats()); - nextPrintMsecs = System.currentTimeMillis() + 15000; - nextPrintVertices = inputSplitVerticesLoaded + totalVerticesLoaded + - 250000; } // For sampling, or to limit outlier input splits, the number of @@ -174,12 +171,8 @@ public class VertexInputSplitsCallable<I extends WritableComparable, } } vertexReader.close(); - totalVerticesLoaded += inputSplitVerticesLoaded; - verticesLoadedCounter.inc(inputSplitVerticesLoaded); - totalEdgesLoaded += inputSplitEdgesLoaded; - edgesLoadedCounter.inc(inputSplitEdgesLoaded); - return new VertexEdgeCount( - inputSplitVerticesLoaded, inputSplitEdgesLoaded); + return new VertexEdgeCount(inputSplitVerticesLoaded, + inputSplitEdgesLoaded + edgesSinceLastUpdate); } }
