Updated Branches: refs/heads/trunk 0e1dd3291 -> f8ee22a11
GIRAPH-491: Observer for job lifecycle (nitay) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/f8ee22a1 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/f8ee22a1 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/f8ee22a1 Branch: refs/heads/trunk Commit: f8ee22a119366d7c48cbbfd5f985d4811c0b3794 Parents: 0e1dd32 Author: Nitay Joffe <[email protected]> Authored: Thu Jan 24 16:32:28 2013 -0800 Committer: Nitay Joffe <[email protected]> Committed: Thu Jan 24 20:38:06 2013 -0800 ---------------------------------------------------------------------- CHANGELOG | 2 + .../apache/giraph/conf/GiraphConfiguration.java | 31 +++++++-- .../org/apache/giraph/conf/GiraphConstants.java | 3 + .../conf/ImmutableClassesGiraphConfiguration.java | 23 ++++++- .../apache/giraph/graph/DefaultJobObserver.java | 52 +++++++++++++++ .../java/org/apache/giraph/graph/GiraphJob.java | 18 ++++-- .../org/apache/giraph/graph/GiraphJobObserver.java | 39 +++++++++++ .../org/apache/giraph/utils/ReflectionUtils.java | 5 +- 8 files changed, 156 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/f8ee22a1/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 2d429f9..27669ba 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-491: Observer for job lifecycle (nitay) + GIRAPH-490: Constants for GiraphStats / GiraphTimers (nitay) GIRAPH-488: ArrayOutOfBoundsException in org.apache.giraph.worker.InputSplitPathOrganizer (ereisman) http://git-wip-us.apache.org/repos/asf/giraph/blob/f8ee22a1/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java index df7b80e..3e29a83 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java @@ -20,16 +20,18 @@ package org.apache.giraph.conf; import org.apache.giraph.aggregators.AggregatorWriter; import org.apache.giraph.combiner.Combiner; +import org.apache.giraph.graph.DefaultJobObserver; +import org.apache.giraph.graph.GiraphJobObserver; +import org.apache.giraph.graph.VertexResolver; import org.apache.giraph.io.EdgeInputFormat; -import org.apache.giraph.master.MasterCompute; -import org.apache.giraph.vertex.Vertex; import org.apache.giraph.io.VertexInputFormat; import org.apache.giraph.io.VertexOutputFormat; -import org.apache.giraph.graph.VertexResolver; -import org.apache.giraph.worker.WorkerContext; +import org.apache.giraph.master.MasterCompute; +import org.apache.giraph.master.MasterObserver; import org.apache.giraph.partition.GraphPartitionerFactory; import org.apache.giraph.partition.Partition; -import org.apache.giraph.master.MasterObserver; +import org.apache.giraph.vertex.Vertex; +import org.apache.giraph.worker.WorkerContext; import org.apache.hadoop.conf.Configuration; /** @@ -110,6 +112,25 @@ public class GiraphConfiguration extends Configuration } /** + * Get job observer class + * + * @return GiraphJobObserver class set. + */ + public Class<? extends GiraphJobObserver> getJobObserverClass() { + return getClass(JOB_OBSERVER_CLASS, DefaultJobObserver.class, + GiraphJobObserver.class); + } + + /** + * Set job observer class + * + * @param klass GiraphJobObserver class to set. + */ + public void setJobObserverClass(Class<? extends GiraphJobObserver> klass) { + setClass(JOB_OBSERVER_CLASS, klass, GiraphJobObserver.class); + } + + /** * Add a class to a property that is a list of classes. If the property does * not exist it will be created. * http://git-wip-us.apache.org/repos/asf/giraph/blob/f8ee22a1/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index 8e75e5b..51415c2 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -43,6 +43,9 @@ public interface GiraphConstants { String GRAPH_PARTITIONER_FACTORY_CLASS = "giraph.graphPartitionerFactoryClass"; + /** Observer class to watch over job status - optional */ + String JOB_OBSERVER_CLASS = "giraph.jobObserverClass"; + // At least one of the input format classes is required. /** VertexInputFormat class */ String VERTEX_INPUT_FORMAT_CLASS = "giraph.vertexInputFormatClass"; http://git-wip-us.apache.org/repos/asf/giraph/blob/f8ee22a1/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java index f2c8701..1f47039 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java @@ -20,6 +20,7 @@ package org.apache.giraph.conf; import org.apache.giraph.aggregators.AggregatorWriter; import org.apache.giraph.combiner.Combiner; +import org.apache.giraph.graph.GiraphJobObserver; import org.apache.giraph.io.EdgeInputFormat; import org.apache.giraph.graph.GraphState; import org.apache.giraph.master.MasterCompute; @@ -57,8 +58,8 @@ import org.apache.hadoop.util.Progressable; * @param <M> Message data */ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable, - V extends Writable, E extends Writable, M extends Writable> extends - GiraphConfiguration { + V extends Writable, E extends Writable, M extends Writable> + extends GiraphConfiguration { /** Master graph partitioner - cached for fast access */ protected final MasterGraphPartitioner<I, V, E, M> masterGraphPartitioner; @@ -89,6 +90,16 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable, } /** + * Configure an object with this instance if the object is configurable. + * @param obj Object + */ + public void configureIfPossible(Object obj) { + if (obj instanceof ImmutableClassesGiraphConfigurable) { + ((ImmutableClassesGiraphConfigurable) obj).setConf(this); + } + } + + /** * Get the user's subclassed * {@link org.apache.giraph.partition.GraphPartitionerFactory}. * @@ -406,6 +417,14 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable, } /** + * Create job observer + * @return GiraphJobObserver set in configuration. + */ + public GiraphJobObserver getJobObserver() { + return ReflectionUtils.newInstance(getJobObserverClass(), this); + } + + /** * Get the user's subclassed edge value class. * * @return User's vertex edge value class http://git-wip-us.apache.org/repos/asf/giraph/blob/f8ee22a1/giraph-core/src/main/java/org/apache/giraph/graph/DefaultJobObserver.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/DefaultJobObserver.java b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultJobObserver.java new file mode 100644 index 0000000..9f0418a --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultJobObserver.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.graph; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.hadoop.mapreduce.Job; + +/** + * Default implementation of job observer that does nothing. + */ +public class DefaultJobObserver implements GiraphJobObserver, + ImmutableClassesGiraphConfigurable { + /** configuration object stored here */ + private ImmutableClassesGiraphConfiguration conf; + + @Override + public void setConf(ImmutableClassesGiraphConfiguration configuration) { + this.conf = configuration; + } + + @Override + public ImmutableClassesGiraphConfiguration getConf() { + return this.conf; + } + + @Override + public void launchingJob(Job jobToSubmit) { + // do nothing + } + + @Override + public void jobFinished(Job jobToSubmit, boolean passed) { + // do nothing + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/f8ee22a1/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJob.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJob.java b/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJob.java index 6e93a2f..18c0db2 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJob.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJob.java @@ -264,18 +264,24 @@ public class GiraphJob { } // Set the job properties, check them, and submit the job - ImmutableClassesGiraphConfiguration immutableClassesGiraphConfiguration = + ImmutableClassesGiraphConfiguration conf = new ImmutableClassesGiraphConfiguration(giraphConfiguration); - checkConfiguration(immutableClassesGiraphConfiguration); - checkLocalJobRunnerConfiguration(immutableClassesGiraphConfiguration); - Job submittedJob = new Job(immutableClassesGiraphConfiguration, jobName); + checkConfiguration(conf); + checkLocalJobRunnerConfiguration(conf); + Job submittedJob = new Job(conf, jobName); if (submittedJob.getJar() == null) { - submittedJob.setJarByClass(GiraphJob.class); + submittedJob.setJarByClass(getClass()); } submittedJob.setNumReduceTasks(0); submittedJob.setMapperClass(GraphMapper.class); submittedJob.setInputFormatClass(BspInputFormat.class); submittedJob.setOutputFormatClass(BspOutputFormat.class); - return submittedJob.waitForCompletion(verbose); + + GiraphJobObserver jobObserver = conf.getJobObserver(); + jobObserver.launchingJob(submittedJob); + boolean passed = submittedJob.waitForCompletion(verbose); + jobObserver.jobFinished(submittedJob, passed); + + return passed; } } http://git-wip-us.apache.org/repos/asf/giraph/blob/f8ee22a1/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJobObserver.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJobObserver.java b/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJobObserver.java new file mode 100644 index 0000000..7aec8c2 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GiraphJobObserver.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.graph; + +import org.apache.hadoop.mapreduce.Job; + +/** + * An observer over the job launch lifecycle. + */ +public interface GiraphJobObserver { + /** + * Callback for job about to start. + * @param jobToSubmit Job we're going to submit to hadoop. + */ + void launchingJob(Job jobToSubmit); + + /** + * Callback when job finishes. + * @param submittedJob Job that ran in hadoop. + * @param passed true if job succeeded. + */ + void jobFinished(Job submittedJob, boolean passed); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/f8ee22a1/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java index d0a4a15..ae2c556 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java @@ -18,7 +18,6 @@ package org.apache.giraph.utils; -import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import java.lang.reflect.Array; @@ -184,9 +183,7 @@ public class ReflectionUtils { "newInstance: Illegal access " + theClass.getName(), e); } if (configuration != null) { - if (result instanceof ImmutableClassesGiraphConfigurable) { - ((ImmutableClassesGiraphConfigurable) result).setConf(configuration); - } + configuration.configureIfPossible(result); } return result; }
