Updated Branches: refs/heads/trunk 2353e1adf -> 928c1092b
GIRAPH-527: readVertexInputSplit is always reporting 0 vertices and 0 edges (nitay) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/928c1092 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/928c1092 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/928c1092 Branch: refs/heads/trunk Commit: 928c1092b90084e93cd2cfe61ca3ce392a6e8c01 Parents: 2353e1a Author: Nitay Joffe <[email protected]> Authored: Tue Apr 9 22:37:07 2013 +0300 Committer: Nitay Joffe <[email protected]> Committed: Tue Apr 9 22:37:35 2013 +0300 ---------------------------------------------------------------------- CHANGELOG | 2 + .../org/apache/giraph/graph/GraphTaskManager.java | 2 +- .../org/apache/giraph/metrics/GiraphMetrics.java | 41 ++++++++---- .../giraph/metrics/GiraphMetricsRegistry.java | 54 +++++++++++--- .../giraph/metrics/SuperstepMetricsRegistry.java | 39 +++++++++-- .../java/org/apache/giraph/utils/MemoryUtils.java | 2 +- .../apache/giraph/worker/InputSplitsCallable.java | 6 +- 7 files changed, 111 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/928c1092/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 0843773..23ce3d9 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-527: readVertexInputSplit is always reporting 0 vertices and 0 edges (nitay) + GIRAPH-611: Vertex/EdgeReaderWrapper should configure inner reader (majakabiljo) GIRAPH-609: More information on runtime exceptions for Callables (aching) http://git-wip-us.apache.org/repos/asf/giraph/blob/928c1092/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java index 9823532..abca4c4 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java @@ -661,7 +661,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, * Initialize job-level metrics used by this class. */ private void initJobMetrics() { - GiraphMetricsRegistry jobMetrics = GiraphMetrics.get().perJob(); + GiraphMetricsRegistry jobMetrics = GiraphMetrics.get().perJobOptional(); wcPreAppTimer = new GiraphTimer(jobMetrics, "worker-context-pre-app", TimeUnit.MILLISECONDS); wcPostAppTimer = new GiraphTimer(jobMetrics, "worker-context-post-app", http://git-wip-us.apache.org/repos/asf/giraph/blob/928c1092/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphMetrics.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphMetrics.java b/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphMetrics.java index d104ec1..0454a5e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphMetrics.java +++ b/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphMetrics.java @@ -18,13 +18,14 @@ package org.apache.giraph.metrics; import org.apache.giraph.conf.GiraphConfiguration; -import org.apache.giraph.bsp.BspService; import com.google.common.collect.Lists; import java.io.PrintStream; import java.util.List; +import static org.apache.giraph.bsp.BspService.INPUT_SUPERSTEP; + /** * Top level metrics class for using Yammer's metrics in Giraph. */ @@ -35,8 +36,11 @@ public class GiraphMetrics { /** registry for per-superstep metrics */ private final SuperstepMetricsRegistry perSuperstep; - /** registry for per-job metrics */ - private final GiraphMetricsRegistry perJob; + /** registry for optional per-job metrics */ + private final GiraphMetricsRegistry perJobOptional; + + /** registry for required per-job metrics */ + private final GiraphMetricsRegistry perJobRequired; /** observer for per-superstep metrics re-initialization */ private final List<ResetSuperstepMetricsObserver> observers = @@ -46,8 +50,9 @@ public class GiraphMetrics { * Initialize no-op registry that creates no-op metrics. */ private GiraphMetrics() { - perJob = new GiraphMetricsRegistry(); - perSuperstep = new SuperstepMetricsRegistry(); + perJobOptional = GiraphMetricsRegistry.createFake(); + perSuperstep = SuperstepMetricsRegistry.createFake(); + perJobRequired = GiraphMetricsRegistry.createWithOptional("giraph", "job"); } /** @@ -56,9 +61,9 @@ public class GiraphMetrics { * @param conf GiraphConfiguration to use. */ private GiraphMetrics(GiraphConfiguration conf) { - perJob = new GiraphMetricsRegistry(conf, "giraph", "job"); - perSuperstep = new SuperstepMetricsRegistry(conf, - BspService.INPUT_SUPERSTEP); + perJobOptional = GiraphMetricsRegistry.create(conf, "giraph", "job"); + perSuperstep = SuperstepMetricsRegistry.create(conf, INPUT_SUPERSTEP); + perJobRequired = GiraphMetricsRegistry.createWithOptional("giraph", "job"); } /** @@ -80,12 +85,21 @@ public class GiraphMetrics { } /** - * Get per-job metrics. + * Get per-job optional metrics. + * + * @return per-job optional {@link GiraphMetricsRegistry} + */ + public GiraphMetricsRegistry perJobOptional() { + return perJobOptional; + } + + /** + * Get per-job required metrics. * - * @return per-job GiraphMetricsRegistry + * @return per-job require {@link GiraphMetricsRegistry} */ - public GiraphMetricsRegistry perJob() { - return perJob; + public GiraphMetricsRegistry perJobRequired() { + return perJobRequired; } /** @@ -128,7 +142,8 @@ public class GiraphMetrics { * @param out PrintStream to dump to. */ public void dumpToStream(PrintStream out) { - perJob.printToStream(out); + perJobOptional.printToStream(out); + perJobRequired.printToStream(out); perSuperstep.printToStream(out); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/928c1092/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphMetricsRegistry.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphMetricsRegistry.java b/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphMetricsRegistry.java index 355e510..3c61872 100644 --- a/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphMetricsRegistry.java +++ b/giraph-core/src/main/java/org/apache/giraph/metrics/GiraphMetricsRegistry.java @@ -50,31 +50,61 @@ public class GiraphMetricsRegistry { private final JmxReporter jmxReporter; /** + * Constructor + * @param registry {@link MetricsRegistry} to use + * @param reporter {@link JmxReporter} to use + * @param groupName String grouping for metrics + * @param type String type name for metrics + */ + protected GiraphMetricsRegistry(MetricsRegistry registry, + JmxReporter reporter, String groupName, String type) { + this.registry = registry; + this.jmxReporter = reporter; + this.groupName = groupName; + this.type = type; + if (jmxReporter != null) { + jmxReporter.start(); + } + } + + /** * Create no-op empty registry that makes no-op metrics. + * @return fake registry that makes no-op metrics + */ + public static GiraphMetricsRegistry createFake() { + return new GiraphMetricsRegistry(new NoOpMetricsRegistry(), null, "", ""); + } + + /** + * Create registry with group to use for metrics. + * + * @param groupName String group to use for metrics. + * @param type String type to use for metrics. + * @return new metrics registry */ - public GiraphMetricsRegistry() { - registry = new NoOpMetricsRegistry(); - jmxReporter = null; + public static GiraphMetricsRegistry createWithOptional(String groupName, + String type) { + MetricsRegistry registry = new MetricsRegistry(); + return new GiraphMetricsRegistry(registry, new JmxReporter(registry), + groupName, type); } /** * Create registry with Hadoop Configuration and group to use for metrics. + * Checks the configuration object for whether the optional metrics are + * enabled, and optionally creates those. * * @param conf Hadoop Configuration to use. * @param groupName String group to use for metrics. * @param type String type to use for metrics. + * @return new metrics registry */ - public GiraphMetricsRegistry(GiraphConfiguration conf, String groupName, - String type) { - this.groupName = groupName; - this.type = type; + public static GiraphMetricsRegistry create(GiraphConfiguration conf, + String groupName, String type) { if (conf.metricsEnabled()) { - registry = new MetricsRegistry(); - jmxReporter = new JmxReporter(registry); - jmxReporter.start(); + return createWithOptional(groupName, type); } else { - registry = new NoOpMetricsRegistry(); - jmxReporter = null; + return createFake(); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/928c1092/giraph-core/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java b/giraph-core/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java index 57b858e..c58a486 100644 --- a/giraph-core/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java +++ b/giraph-core/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java @@ -21,6 +21,9 @@ package org.apache.giraph.metrics; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.bsp.BspService; +import com.yammer.metrics.core.MetricsRegistry; +import com.yammer.metrics.reporting.JmxReporter; + import java.io.PrintStream; /** @@ -31,10 +34,15 @@ public class SuperstepMetricsRegistry extends GiraphMetricsRegistry { private long superstep = BspService.INPUT_SUPERSTEP; /** - * Create no-op registry that creates no-op metrics. + * Constructor + * @param registry {@link com.yammer.metrics.core.MetricsRegistry} to use + * @param reporter {@link com.yammer.metrics.reporting.JmxReporter} to use + * @param groupName String grouping for metrics + * @param type String type name for metrics */ - public SuperstepMetricsRegistry() { - super(); + protected SuperstepMetricsRegistry(MetricsRegistry registry, + JmxReporter reporter, String groupName, String type) { + super(registry, reporter, groupName, type); } /** @@ -42,10 +50,29 @@ public class SuperstepMetricsRegistry extends GiraphMetricsRegistry { * * @param conf Hadoop Configuration to use. * @param superstep number of superstep to use as group for metrics. + * @return new metrics registry */ - public SuperstepMetricsRegistry(GiraphConfiguration conf, long superstep) { - super(conf, "giraph.superstep", String.valueOf(superstep)); - this.superstep = superstep; + public static SuperstepMetricsRegistry create(GiraphConfiguration conf, + long superstep) { + if (conf.metricsEnabled()) { + MetricsRegistry registry = new MetricsRegistry(); + SuperstepMetricsRegistry superstepMetrics = new SuperstepMetricsRegistry( + registry, new JmxReporter(registry), + "giraph.superstep", String.valueOf(superstep)); + superstepMetrics.superstep = superstep; + return superstepMetrics; + } else { + return createFake(); + } + } + + /** + * Create an empty registry + * @return fake metrics registry that returns no op metrics + */ + public static SuperstepMetricsRegistry createFake() { + return new SuperstepMetricsRegistry(new NoOpMetricsRegistry(), null, + "", ""); } /** http://git-wip-us.apache.org/repos/asf/giraph/blob/928c1092/giraph-core/src/main/java/org/apache/giraph/utils/MemoryUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/MemoryUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/MemoryUtils.java index b5ebb10..072265b 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/MemoryUtils.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/MemoryUtils.java @@ -68,7 +68,7 @@ public class MemoryUtils { * Initialize metrics tracked by this helper. */ public static void initMetrics() { - GiraphMetricsRegistry metrics = GiraphMetrics.get().perJob(); + GiraphMetricsRegistry metrics = GiraphMetrics.get().perJobOptional(); metrics.getGauge(MetricNames.MEMORY_FREE_PERCENT, new PercentGauge() { @Override protected double getNumerator() { http://git-wip-us.apache.org/repos/asf/giraph/blob/928c1092/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java index 0ec20fd..a3a9ab7 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java @@ -123,7 +123,8 @@ public abstract class InputSplitsCallable<I extends WritableComparable, * @return Meter tracking edges loaded */ public static Meter getTotalEdgesLoadedMeter() { - return GiraphMetrics.get().perJob().getMeter(MeterDesc.EDGES_LOADED); + return GiraphMetrics.get().perJobRequired() + .getMeter(MeterDesc.EDGES_LOADED); } /** @@ -132,7 +133,8 @@ public abstract class InputSplitsCallable<I extends WritableComparable, * @return Meter for vertices loaded */ public static Meter getTotalVerticesLoadedMeter() { - return GiraphMetrics.get().perJob().getMeter(MeterDesc.VERTICES_LOADED); + return GiraphMetrics.get().perJobRequired() + .getMeter(MeterDesc.VERTICES_LOADED); } /**
