Updated Branches: refs/heads/trunk 95ce243fd -> 56fcb519a
GIRAPH-582: Create a generic option for determining the number of supersteps that a job runs for (aching) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/56fcb519 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/56fcb519 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/56fcb519 Branch: refs/heads/trunk Commit: 56fcb519aa56550bb5fdfa666eba275e0f1b0020 Parents: 95ce243 Author: Avery Ching <[email protected]> Authored: Mon Mar 25 12:13:14 2013 -0700 Committer: Avery Ching <[email protected]> Committed: Mon Mar 25 15:46:34 2013 -0700 ---------------------------------------------------------------------- CHANGELOG | 3 + .../apache/giraph/conf/GiraphConfiguration.java | 20 ++++ .../org/apache/giraph/conf/GiraphConstants.java | 9 ++ .../org/apache/giraph/master/BspServiceMaster.java | 19 +++- .../java/org/apache/giraph/TestMaxSuperstep.java | 83 +++++++++++++++ 5 files changed, 132 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/56fcb519/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 84939a4..b695ae0 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,9 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-582: Create a generic option for determining the number of + supersteps that a job runs for (aching) + GIRAPH-586: Customizable default vertex value (apresta) GIRAPH-580: NPE in HiveGiraphRunner when the vertex output format is http://git-wip-us.apache.org/repos/asf/giraph/blob/56fcb519/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java index ae276f6..3b84831 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java @@ -751,4 +751,24 @@ public class GiraphConfiguration extends Configuration get(GiraphConstants.DNS_INTERFACE, "default"), get(GiraphConstants.DNS_NAMESERVER, "default")); } + + /** + * Set the maximum number of supersteps of this application. After this + * many supersteps are executed, the application will shutdown. + * + * @param maxNumberOfSupersteps Maximum number of supersteps + */ + public void setMaxNumberOfSupersteps(int maxNumberOfSupersteps) { + setInt(MAX_NUMBER_OF_SUPERSTEPS, maxNumberOfSupersteps); + } + + /** + * Get the maximum number of supersteps of this application. After this + * many supersteps are executed, the application will shutdown. + * + * @return Maximum number of supersteps + */ + public int getMaxNumberOfSupersteps() { + return getInt(MAX_NUMBER_OF_SUPERSTEPS, MAX_NUMBER_OF_SUPERSTEPS_DEFAULT); + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/56fcb519/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index fe5278b..42f8abc 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -684,5 +684,14 @@ public interface GiraphConstants { String DNS_INTERFACE = "giraph.dns.interface"; /** Server for hostname resolution */ String DNS_NAMESERVER = "giraph.dns.nameserver"; + + /** + * The application will halt after this many supersteps is completed. For + * instance, if it is set to 3, the application will run at most 0, 1, + * and 2 supersteps and then go into the shutdown superstep. + */ + String MAX_NUMBER_OF_SUPERSTEPS = "giraph.maxNumberOfSupersteps"; + /** By default, the number of supersteps is not limited */ + int MAX_NUMBER_OF_SUPERSTEPS_DEFAULT = -1; } // CHECKSTYLE: resume InterfaceIsTypeCheck http://git-wip-us.apache.org/repos/asf/giraph/blob/56fcb519/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java index 9188a23..6c979d6 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -135,6 +135,8 @@ public class BspServiceMaster<I extends WritableComparable, private final int maxWorkers; /** Min number of workers */ private final int minWorkers; + /** Max number of supersteps */ + private final int maxNumberOfSupersteps; /** Min % responded workers */ private final float minPercentResponded; /** Msecs to wait for an event */ @@ -196,8 +198,9 @@ public class BspServiceMaster<I extends WritableComparable, ImmutableClassesGiraphConfiguration<I, V, E, M> conf = getConfiguration(); - maxWorkers = conf.getInt(GiraphConstants.MAX_WORKERS, -1); - minWorkers = conf.getInt(GiraphConstants.MIN_WORKERS, -1); + maxWorkers = conf.getMaxWorkers(); + minWorkers = conf.getMinWorkers(); + maxNumberOfSupersteps = conf.getMaxNumberOfSupersteps(); minPercentResponded = conf.getFloat( GiraphConstants.MIN_PERCENT_RESPONDED, 100.0f); eventWaitMsecs = conf.getEventWaitMsecs(); @@ -1517,6 +1520,18 @@ public class BspServiceMaster<I extends WritableComparable, globalStats.setHaltComputation(true); } + // If we have completed the maximum number of supersteps, stop + // the computation + if (maxNumberOfSupersteps != + GiraphConstants.MAX_NUMBER_OF_SUPERSTEPS_DEFAULT && + (getSuperstep() == maxNumberOfSupersteps - 1)) { + if (LOG.isInfoEnabled()) { + LOG.info("coordinateSuperstep: Finished " + maxNumberOfSupersteps + + " supersteps (max specified by the user), halting"); + } + globalStats.setHaltComputation(true); + } + // Let everyone know the aggregated application state through the // superstep finishing znode. String superstepFinishedNode = http://git-wip-us.apache.org/repos/asf/giraph/blob/56fcb519/giraph-examples/src/test/java/org/apache/giraph/TestMaxSuperstep.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestMaxSuperstep.java b/giraph-examples/src/test/java/org/apache/giraph/TestMaxSuperstep.java new file mode 100644 index 0000000..d7ac4e8 --- /dev/null +++ b/giraph-examples/src/test/java/org/apache/giraph/TestMaxSuperstep.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph; + +import java.io.IOException; +import org.apache.giraph.conf.GiraphClasses; +import org.apache.giraph.counters.GiraphHadoopCounter; +import org.apache.giraph.counters.GiraphStats; +import org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexInputFormat; +import org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexOutputFormat; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.job.GiraphJob; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.LongWritable; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Unit test for testing max superstep feature of Giraph + */ +public class TestMaxSuperstep extends BspCase { + public TestMaxSuperstep() { + super(TestMaxSuperstep.class.getName()); + } + + /** + * Simple test vertex class that will run forever (voteToHalt is never + * called). + */ + public static class InfiniteLoopVertex extends Vertex<LongWritable, + DoubleWritable, FloatWritable, DoubleWritable> { + @Override + public void compute(Iterable<DoubleWritable> messages) throws IOException { + // Do nothing, run forever! + } + } + + /** + * Run a job that tests that this job completes in the desired number of + * supersteps + * + * @throws java.io.IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testMaxSuperstep() + throws IOException, InterruptedException, ClassNotFoundException { + GiraphClasses<LongWritable, DoubleWritable, FloatWritable, DoubleWritable> + classes = new GiraphClasses(); + classes.setVertexClass(InfiniteLoopVertex.class); + classes.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class); + classes.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class); + GiraphJob job = prepareJob(getCallingMethodName(), classes, + getTempPath(getCallingMethodName())); + job.getConfiguration().setMaxNumberOfSupersteps(3); + assertTrue(job.run(true)); + if (!runningInDistributedMode()) { + GiraphHadoopCounter superstepCounter = + GiraphStats.getInstance().getSuperstepCounter(); + assertEquals(superstepCounter.getValue(), 3L); + } + } +}
