Repository: giraph Updated Branches: refs/heads/trunk 62b714547 -> 39237cf7f
Fix removal of MasterLoggingAggregator Test Plan: mvn clean install How should I test this? Reviewers: pavanka, maja.kabiljo Reviewed By: maja.kabiljo Differential Revision: https://reviews.facebook.net/D27651 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/39237cf7 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/39237cf7 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/39237cf7 Branch: refs/heads/trunk Commit: 39237cf7fbf0742a8d7200db4039e5b43a58eca8 Parents: 62b7145 Author: Igor Kabiljo <[email protected]> Authored: Tue Oct 28 14:56:08 2014 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Tue Oct 28 15:04:45 2014 -0700 ---------------------------------------------------------------------- .../apache/giraph/benchmark/AggregatorsBenchmark.java | 13 ++++++++----- .../master/AggregatorToGlobalCommTranslation.java | 7 +++++++ .../org/apache/giraph/master/BspServiceMaster.java | 2 ++ 3 files changed, 17 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/39237cf7/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java index 539bd7d..850e3ec 100644 --- a/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java +++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java @@ -18,15 +18,19 @@ package org.apache.giraph.benchmark; +import java.io.IOException; +import java.util.Set; + import org.apache.commons.cli.CommandLine; import org.apache.giraph.aggregators.LongSumAggregator; -import org.apache.giraph.graph.BasicComputation; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.graph.BasicComputation; +import org.apache.giraph.graph.Vertex; import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants; import org.apache.giraph.io.formats.PseudoRandomVertexInputFormat; import org.apache.giraph.master.DefaultMasterCompute; -import org.apache.giraph.graph.Vertex; +import org.apache.giraph.utils.MasterLoggingAggregator; import org.apache.giraph.worker.DefaultWorkerContext; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DoubleWritable; @@ -35,9 +39,6 @@ import org.apache.hadoop.util.ToolRunner; import com.google.common.collect.Sets; -import java.io.IOException; -import java.util.Set; - /** * Benchmark for aggregators. Also checks the correctness. */ @@ -123,6 +124,7 @@ public class AggregatorsBenchmark extends GiraphBenchmark { public void preSuperstep() { addToWorkerAggregators(1); checkAggregators(); + MasterLoggingAggregator.aggregate("everything fine", this, getConf()); } @Override @@ -214,6 +216,7 @@ public class AggregatorsBenchmark extends GiraphBenchmark { conf.setLong(PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 1); conf.setInt(AGGREGATORS_NUM, AGGREGATORS.getOptionIntValue(cmd)); conf.setInt("workers", conf.getInt(GiraphConstants.MAX_WORKERS, -1)); + MasterLoggingAggregator.setUseMasterLoggingAggregator(true, conf); } /** http://git-wip-us.apache.org/repos/asf/giraph/blob/39237cf7/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java index 36a4553..c13d7bd 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java @@ -26,6 +26,7 @@ import java.util.Map.Entry; import org.apache.giraph.aggregators.Aggregator; import org.apache.giraph.comm.aggregators.AggregatorUtils; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.utils.MasterLoggingAggregator; import org.apache.hadoop.io.Writable; import org.apache.log4j.Logger; @@ -71,6 +72,7 @@ public class AggregatorToGlobalCommTranslation MasterGlobalCommUsage globalComm) { this.conf = conf; this.globalComm = globalComm; + MasterLoggingAggregator.registerAggregator(this, conf); } @Override @@ -143,6 +145,11 @@ public class AggregatorToGlobalCommTranslation initAggregatorValues.clear(); } + /** Prepare before calling master compute */ + public void prepareSuperstep() { + MasterLoggingAggregator.logAggregatedValue(this, conf); + } + @Override public <A extends Writable> boolean registerAggregator(String name, Class<? extends Aggregator<A>> aggregatorClass) throws http://git-wip-us.apache.org/repos/asf/giraph/blob/39237cf7/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java index 62b089c..39b4a1c 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -1668,6 +1668,8 @@ public class BspServiceMaster<I extends WritableComparable, // Collect aggregator values, then run the master.compute() and // finally save the aggregator values globalCommHandler.prepareSuperstep(); + aggregatorTranslation.prepareSuperstep(); + SuperstepClasses superstepClasses = prepareMasterCompute(getSuperstep() + 1); doMasterCompute();
