Repository: giraph Updated Branches: refs/heads/trunk 24bed1a9b -> 093e81995
GIRAPH-1050: Add MapperObserver Summary: Add MapperObserver which will be called once per mapper before anything else happens. Test Plan: Ran a job with MapperObserver set, verified it's called at the right time Differential Revision: https://reviews.facebook.net/D56373 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/093e8199 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/093e8199 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/093e8199 Branch: refs/heads/trunk Commit: 093e81995e94ee6dbfe6278f04d59a29cd54ac8b Parents: 24bed1a Author: Maja Kabiljo <[email protected]> Authored: Thu Apr 7 09:40:27 2016 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Thu Apr 7 10:49:12 2016 -0700 ---------------------------------------------------------------------- .../apache/giraph/conf/GiraphConfiguration.java | 20 +++++++++++++ .../org/apache/giraph/conf/GiraphConstants.java | 5 ++++ .../ImmutableClassesGiraphConfiguration.java | 15 ++++++++++ .../apache/giraph/graph/GraphTaskManager.java | 13 +++++++++ .../org/apache/giraph/graph/MapperObserver.java | 30 ++++++++++++++++++++ 5 files changed, 83 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/093e8199/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java index 6b00645..78bd5ef 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java @@ -33,6 +33,7 @@ import org.apache.giraph.edge.ReuseObjectsOutEdges; import org.apache.giraph.factories.ComputationFactory; import org.apache.giraph.factories.VertexValueFactory; import org.apache.giraph.graph.Computation; +import org.apache.giraph.graph.MapperObserver; import org.apache.giraph.graph.Vertex; import org.apache.giraph.graph.VertexResolver; import org.apache.giraph.graph.VertexValueCombiner; @@ -313,6 +314,16 @@ public class GiraphConfiguration extends Configuration } /** + * Add a MapperObserver class (optional) + * + * @param mapperObserverClass MapperObserver class to add. + */ + public final void addMapperObserverClass( + Class<? extends MapperObserver> mapperObserverClass) { + MAPPER_OBSERVER_CLASSES.add(this, mapperObserverClass); + } + + /** * Get job observer class * * @return GiraphJobObserver class set. @@ -687,6 +698,15 @@ public class GiraphConfiguration extends Configuration } /** + * Get array of MapperObserver classes set in configuration. + * + * @return array of MapperObserver classes. + */ + public Class<? extends MapperObserver>[] getMapperObserverClasses() { + return MAPPER_OBSERVER_CLASSES.getArray(this); + } + + /** * Whether to track, print, and aggregate metrics. * * @return true if metrics are enabled, false otherwise (default) http://git-wip-us.apache.org/repos/asf/giraph/blob/093e8199/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index 4787d37..b7f0d5c 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -48,6 +48,7 @@ import org.apache.giraph.graph.DefaultVertex; import org.apache.giraph.graph.DefaultVertexResolver; import org.apache.giraph.graph.DefaultVertexValueCombiner; import org.apache.giraph.graph.Language; +import org.apache.giraph.graph.MapperObserver; import org.apache.giraph.graph.Vertex; import org.apache.giraph.graph.VertexResolver; import org.apache.giraph.graph.VertexValueCombiner; @@ -202,6 +203,10 @@ public interface GiraphConstants { ClassConfOption<WorkerObserver> WORKER_OBSERVER_CLASSES = ClassConfOption.create("giraph.worker.observers", null, WorkerObserver.class, "Classes for Worker Observer - optional"); + /** Classes for Mapper Observer - optional */ + ClassConfOption<MapperObserver> MAPPER_OBSERVER_CLASSES = + ClassConfOption.create("giraph.mapper.observers", null, + MapperObserver.class, "Classes for Mapper Observer - optional"); /** Message combiner class - optional */ ClassConfOption<MessageCombiner> MESSAGE_COMBINER_CLASS = ClassConfOption.create("giraph.messageCombinerClass", null, http://git-wip-us.apache.org/repos/asf/giraph/blob/093e8199/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java index 38bf101..130c581 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java @@ -41,6 +41,7 @@ import org.apache.giraph.factories.VertexIdFactory; import org.apache.giraph.factories.VertexValueFactory; import org.apache.giraph.graph.Computation; import org.apache.giraph.graph.Language; +import org.apache.giraph.graph.MapperObserver; import org.apache.giraph.graph.Vertex; import org.apache.giraph.graph.VertexResolver; import org.apache.giraph.graph.VertexValueCombiner; @@ -756,6 +757,20 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable, } /** + * Create array of MapperObservers. + * + * @return Instantiated array of MapperObservers. + */ + public MapperObserver[] createMapperObservers() { + Class<? extends MapperObserver>[] klasses = getMapperObserverClasses(); + MapperObserver[] objects = new MapperObserver[klasses.length]; + for (int i = 0; i < klasses.length; ++i) { + objects[i] = ReflectionUtils.newInstance(klasses[i], this); + } + return objects; + } + + /** * Create job observer * * @return GiraphJobObserver set in configuration. http://git-wip-us.apache.org/repos/asf/giraph/blob/093e8199/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java index 62a87de..19ac615 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java @@ -160,6 +160,8 @@ end[PURE_YARN]*/ private final Mapper<?, ?, ?, ?>.Context context; /** is this GraphTaskManager the master? */ private boolean isMaster; + /** Mapper observers */ + private MapperObserver[] mapperObservers; /** * Default constructor for GiraphTaskManager. @@ -206,6 +208,7 @@ end[PURE_YARN]*/ context.setStatus("setup: Beginning worker setup."); Configuration hadoopConf = context.getConfiguration(); conf = new ImmutableClassesGiraphConfiguration<I, V, E>(hadoopConf); + setupMapperObservers(); initializeJobProgressTracker(); // Write user's graph types (I,V,E,M) back to configuration parameters so // that they are set for quicker access later. These types are often @@ -872,6 +875,16 @@ end[PURE_YARN]*/ } /** + * Setup mapper observers + */ + public void setupMapperObservers() { + mapperObservers = conf.createMapperObservers(); + for (MapperObserver mapperObserver : mapperObservers) { + mapperObserver.setup(); + } + } + + /** * Executes preLoad() on worker observers. */ private void preLoadOnWorkerObservers() { http://git-wip-us.apache.org/repos/asf/giraph/blob/093e8199/giraph-core/src/main/java/org/apache/giraph/graph/MapperObserver.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/MapperObserver.java b/giraph-core/src/main/java/org/apache/giraph/graph/MapperObserver.java new file mode 100644 index 0000000..cfbb421 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/graph/MapperObserver.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.graph; + +/** + * Mapper observer + */ +public interface MapperObserver { + /** + * Setup mapper. Called in the beginning of mapper setup, the only thing + * which happens before is configuration preparation. + */ + void setup(); +}
