Updated Branches: refs/heads/trunk 110d77c55 -> df64dd7b8
GIRAPH-698: Expose Computation to a user (aching) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/df64dd7b Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/df64dd7b Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/df64dd7b Branch: refs/heads/trunk Commit: df64dd7b8094864b3857d95309b1278c48be7322 Parents: 110d77c Author: Avery Ching <[email protected]> Authored: Wed Jun 26 14:07:16 2013 -0700 Committer: Avery Ching <[email protected]> Committed: Wed Jun 26 19:01:53 2013 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 ++ .../apache/giraph/bsp/CentralizedServiceMaster.java | 8 ++++++++ .../org/apache/giraph/counters/GiraphTimers.java | 15 +++++++++------ .../org/apache/giraph/master/BspServiceMaster.java | 13 ++++++++++++- .../org/apache/giraph/master/MasterCompute.java | 10 ++++++++++ .../java/org/apache/giraph/master/MasterThread.java | 9 +++++++-- .../org/apache/giraph/master/SuperstepClasses.java | 12 +++++++++--- .../giraph/master/TestComputationCombinerTypes.java | 16 ++++++++-------- 8 files changed, 65 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/df64dd7b/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 5804cab..23a6b71 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-698: Expose Computation to a user (aching) + GIRAPH-311: Master halting in superstep 0 is ignored by workers (majakabiljo) GIRAPH-688: Make sure Giraph builds against all compatible YARN-enabled Hadoop versions, http://git-wip-us.apache.org/repos/asf/giraph/blob/df64dd7b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java index f41fc3d..999888d 100644 --- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java @@ -19,6 +19,7 @@ package org.apache.giraph.bsp; import org.apache.giraph.master.MasterAggregatorHandler; +import org.apache.giraph.master.MasterCompute; import org.apache.giraph.master.MasterInfo; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -123,6 +124,13 @@ public interface CentralizedServiceMaster<I extends WritableComparable, MasterAggregatorHandler getAggregatorHandler(); /** + * Get MasterCompute object + * + * @return MasterCompute object + */ + MasterCompute getMasterCompute(); + + /** * Superstep has finished. */ void postSuperstep(); http://git-wip-us.apache.org/repos/asf/giraph/blob/df64dd7b/giraph-core/src/main/java/org/apache/giraph/counters/GiraphTimers.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/counters/GiraphTimers.java b/giraph-core/src/main/java/org/apache/giraph/counters/GiraphTimers.java index 0d50c29..cbf2470 100644 --- a/giraph-core/src/main/java/org/apache/giraph/counters/GiraphTimers.java +++ b/giraph-core/src/main/java/org/apache/giraph/counters/GiraphTimers.java @@ -34,11 +34,11 @@ public class GiraphTimers extends HadoopCountersBase { /** Counter group name for the giraph timers */ public static final String GROUP_NAME = "Giraph Timers"; /** Counter name for setup msec */ - public static final String SETUP_MS_NAME = "Setup (milliseconds)"; + public static final String SETUP_MS_NAME = "Setup (ms)"; /** Counter name for total msec */ - public static final String TOTAL_MS_NAME = "Total (milliseconds)"; + public static final String TOTAL_MS_NAME = "Total (ms)"; /** Counter name for shutdown msec */ - public static final String SHUTDOWN_MS_NAME = "Shutdown (milliseconds)"; + public static final String SHUTDOWN_MS_NAME = "Shutdown (ms)"; /** Singleton instance for everyone to use */ private static GiraphTimers INSTANCE; @@ -103,18 +103,21 @@ public class GiraphTimers extends HadoopCountersBase { * Get counter for superstep time in milliseconds * * @param superstep Integer superstep number. + * @param computationName Name of the computation for display (may be null) * @return Counter for setup time in milliseconds */ - public GiraphHadoopCounter getSuperstepMs(long superstep) { + public GiraphHadoopCounter getSuperstepMs(long superstep, + String computationName) { GiraphHadoopCounter counter = superstepMsec.get(superstep); if (counter == null) { String counterPrefix; if (superstep == -1) { counterPrefix = "Input superstep"; } else { - counterPrefix = "Superstep " + superstep; + counterPrefix = "Superstep " + superstep + + (computationName == null ? "" : " " + computationName); } - counter = getCounter(counterPrefix + " (milliseconds)"); + counter = getCounter(counterPrefix + " (ms)"); superstepMsec.put(superstep, counter); } return counter; http://git-wip-us.apache.org/repos/asf/giraph/blob/df64dd7b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java index 3558887..1d3cff0 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -717,6 +717,11 @@ public class BspServiceMaster<I extends WritableComparable, return aggregatorHandler; } + @Override + public MasterCompute getMasterCompute() { + return masterCompute; + } + /** * Read the finalized checkpoint file and associated metadata files for the * checkpoint. Modifies the {@link PartitionOwner} objects to get the @@ -1636,7 +1641,13 @@ public class BspServiceMaster<I extends WritableComparable, globalStats.setHaltComputation(true); } - superstepClasses.verifyTypesMatch(getConfiguration()); + // Superstep 0 doesn't need to have matching types (Message types may not + // match) and if the computation is halted, no need to check any of + // the types. + if (!globalStats.getHaltComputation()) { + superstepClasses.verifyTypesMatch( + getConfiguration(), getSuperstep() != 0); + } getConfiguration().updateSuperstepClasses(superstepClasses); // Let everyone know the aggregated application state through the http://git-wip-us.apache.org/repos/asf/giraph/blob/df64dd7b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java index 310cb26..a3427dc 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java @@ -134,6 +134,11 @@ public abstract class MasterCompute * @return Computation class */ public final Class<? extends Computation> getComputation() { + // Might be called prior to classes being set, do not return NPE + if (superstepClasses == null) { + return null; + } + return superstepClasses.getComputationClass(); } @@ -152,6 +157,11 @@ public abstract class MasterCompute * @return Combiner class */ public final Class<? extends Combiner> getCombiner() { + // Might be called prior to classes being set, do not return NPE + if (superstepClasses == null) { + return null; + } + return superstepClasses.getCombinerClass(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/df64dd7b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java index e8eeeed..ec1733c 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java @@ -23,6 +23,7 @@ import org.apache.giraph.bsp.BspService; import org.apache.giraph.bsp.CentralizedServiceMaster; import org.apache.giraph.bsp.SuperstepState; import org.apache.giraph.counters.GiraphTimers; +import org.apache.giraph.graph.Computation; import org.apache.giraph.metrics.GiraphMetrics; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -109,6 +110,8 @@ public class MasterThread<I extends WritableComparable, V extends Writable, long startSuperstepMillis = System.currentTimeMillis(); cachedSuperstep = bspServiceMaster.getSuperstep(); GiraphMetrics.get().resetSuperstepMetrics(cachedSuperstep); + Class<? extends Computation> computationClass = + bspServiceMaster.getMasterCompute().getComputation(); superstepState = bspServiceMaster.coordinateSuperstep(); long superstepMillis = System.currentTimeMillis() - startSuperstepMillis; @@ -123,8 +126,10 @@ public class MasterThread<I extends WritableComparable, V extends Writable, bspServiceMaster.getSuperstep()); } if (superstepCounterOn) { - GiraphTimers.getInstance().getSuperstepMs(cachedSuperstep). - increment(superstepMillis); + String computationName = (computationClass == null) ? + null : computationClass.getSimpleName(); + GiraphTimers.getInstance().getSuperstepMs(cachedSuperstep, + computationName).increment(superstepMillis); } bspServiceMaster.postSuperstep(); http://git-wip-us.apache.org/repos/asf/giraph/blob/df64dd7b/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java b/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java index 13bb492..7a7df05 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java @@ -93,8 +93,11 @@ public class SuperstepClasses implements Writable { * don't match an {@link IllegalStateException} will be thrown. * * @param conf Configuration to verify this with + * @param checkMatchingMesssageTypes Check that the incoming/outgoing + * message types match */ - public void verifyTypesMatch(ImmutableClassesGiraphConfiguration conf) { + public void verifyTypesMatch(ImmutableClassesGiraphConfiguration conf, + boolean checkMatchingMesssageTypes) { // In some cases, for example when using Jython, the Computation class may // not be set. This is because it is created by a ComputationFactory // dynamically and not known ahead of time. In this case there is nothing to @@ -111,8 +114,11 @@ public class SuperstepClasses implements Writable { "Vertex value", computationClass); verifyTypes(conf.getEdgeValueClass(), computationTypes[2], "Edge value", computationClass); - verifyTypes(conf.getOutgoingMessageValueClass(), computationTypes[3], - "Previous outgoing and new incoming message", computationClass); + + if (checkMatchingMesssageTypes) { + verifyTypes(conf.getOutgoingMessageValueClass(), computationTypes[3], + "Previous outgoing and new incoming message", computationClass); + } Class<?> outgoingMessageType = computationTypes[4]; if (outgoingMessageType.isInterface()) { throw new IllegalStateException("verifyTypesMatch: " + http://git-wip-us.apache.org/repos/asf/giraph/blob/df64dd7b/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java b/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java index 8ae09bc..f9bd4e4 100644 --- a/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java +++ b/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java @@ -39,7 +39,7 @@ public class TestComputationCombinerTypes { public void testAllMatchWithoutCombiner() { SuperstepClasses classes = new SuperstepClasses(IntNoOpComputation.class, null); - classes.verifyTypesMatch(createConfiguration(IntNoOpComputation.class)); + classes.verifyTypesMatch(createConfiguration(IntNoOpComputation.class), true); } @Test @@ -48,7 +48,7 @@ public class TestComputationCombinerTypes { new SuperstepClasses(IntIntIntLongDoubleComputation.class, IntDoubleCombiner.class); classes.verifyTypesMatch( - createConfiguration(IntIntIntIntLongComputation.class)); + createConfiguration(IntIntIntIntLongComputation.class), true); } @Test(expected = IllegalStateException.class) @@ -56,7 +56,7 @@ public class TestComputationCombinerTypes { SuperstepClasses classes = new SuperstepClasses(LongIntIntLongIntComputation.class, null); classes.verifyTypesMatch( - createConfiguration(IntIntIntIntLongComputation.class)); + createConfiguration(IntIntIntIntLongComputation.class), true); } @Test(expected = IllegalStateException.class) @@ -64,7 +64,7 @@ public class TestComputationCombinerTypes { SuperstepClasses classes = new SuperstepClasses(IntLongIntLongIntComputation.class, null); classes.verifyTypesMatch( - createConfiguration(IntIntIntIntLongComputation.class)); + createConfiguration(IntIntIntIntLongComputation.class), true); } @Test(expected = IllegalStateException.class) @@ -72,7 +72,7 @@ public class TestComputationCombinerTypes { SuperstepClasses classes = new SuperstepClasses(IntIntLongLongIntComputation.class, null); classes.verifyTypesMatch( - createConfiguration(IntIntIntIntLongComputation.class)); + createConfiguration(IntIntIntIntLongComputation.class), true); } @Test(expected = IllegalStateException.class) @@ -80,7 +80,7 @@ public class TestComputationCombinerTypes { SuperstepClasses classes = new SuperstepClasses(IntIntIntIntLongComputation.class, null); classes.verifyTypesMatch( - createConfiguration(IntIntIntLongDoubleComputation.class)); + createConfiguration(IntIntIntLongDoubleComputation.class), true); } @Test(expected = IllegalStateException.class) @@ -89,7 +89,7 @@ public class TestComputationCombinerTypes { new SuperstepClasses(IntIntIntLongDoubleComputation.class, DoubleDoubleCombiner.class); classes.verifyTypesMatch( - createConfiguration(IntIntIntIntLongComputation.class)); + createConfiguration(IntIntIntIntLongComputation.class), true); } @Test(expected = IllegalStateException.class) @@ -98,7 +98,7 @@ public class TestComputationCombinerTypes { new SuperstepClasses(IntIntIntLongDoubleComputation.class, IntLongCombiner.class); classes.verifyTypesMatch( - createConfiguration(IntIntIntIntLongComputation.class)); + createConfiguration(IntIntIntIntLongComputation.class), true); } private static ImmutableClassesGiraphConfiguration createConfiguration(
