Updated Branches: refs/heads/trunk e91875175 -> c1ef88914
GIRAPH-501: WorkerObserver Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/c1ef8891 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/c1ef8891 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/c1ef8891 Branch: refs/heads/trunk Commit: c1ef889142ae5e2526f8b83138d0dfee31fb7db6 Parents: e918751 Author: Nitay Joffe <[email protected]> Authored: Tue Feb 5 14:35:19 2013 -0500 Committer: Nitay Joffe <[email protected]> Committed: Tue Feb 5 17:43:26 2013 -0500 ---------------------------------------------------------------------- .../giraph/bsp/CentralizedServiceWorker.java | 27 +++++--- .../apache/giraph/conf/GiraphConfiguration.java | 25 ++++++- .../org/apache/giraph/conf/GiraphConstants.java | 4 +- .../conf/ImmutableClassesGiraphConfiguration.java | 15 ++++ .../org/apache/giraph/graph/GraphTaskManager.java | 36 ++++++++-- .../org/apache/giraph/worker/BspServiceWorker.java | 34 ++++++++-- .../giraph/worker/DefaultWorkerObserver.java | 57 +++++++++++++++ .../org/apache/giraph/worker/WorkerObserver.java | 56 ++++++++++++++ 8 files changed, 228 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/c1ef8891/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java index 71f8f72..56b5d03 100644 --- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java @@ -18,26 +18,26 @@ package org.apache.giraph.bsp; -import java.io.IOException; -import java.util.Collection; -import java.util.List; - import org.apache.giraph.comm.ServerData; import org.apache.giraph.comm.WorkerClient; import org.apache.giraph.graph.FinishedSuperstepStats; import org.apache.giraph.graph.GraphState; import org.apache.giraph.graph.GraphTaskManager; -import org.apache.giraph.master.MasterInfo; import org.apache.giraph.graph.VertexEdgeCount; -import org.apache.giraph.worker.WorkerAggregatorHandler; +import org.apache.giraph.master.MasterInfo; +import org.apache.giraph.partition.PartitionOwner; +import org.apache.giraph.partition.PartitionStats; import org.apache.giraph.partition.PartitionStore; +import org.apache.giraph.worker.WorkerAggregatorHandler; +import org.apache.giraph.worker.WorkerContext; +import org.apache.giraph.worker.WorkerInfo; +import org.apache.giraph.worker.WorkerObserver; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import org.apache.giraph.partition.PartitionOwner; -import org.apache.giraph.partition.PartitionStats; -import org.apache.giraph.worker.WorkerInfo; -import org.apache.giraph.worker.WorkerContext; +import java.io.IOException; +import java.util.Collection; +import java.util.List; /** * All workers should have access to this centralized service to @@ -82,6 +82,13 @@ public interface CentralizedServiceWorker<I extends WritableComparable, WorkerContext getWorkerContext(); /** + * Get the observers for this Worker. + * + * @return array of WorkerObservers. + */ + WorkerObserver[] getWorkerObservers(); + + /** * Get the partition store for this worker. * The partitions contain the vertices for * this worker and can be used to run compute() for the vertices or do http://git-wip-us.apache.org/repos/asf/giraph/blob/c1ef8891/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java index b3b9c4b..9ca1e7e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java @@ -32,6 +32,7 @@ import org.apache.giraph.partition.GraphPartitionerFactory; import org.apache.giraph.partition.Partition; import org.apache.giraph.vertex.Vertex; import org.apache.giraph.worker.WorkerContext; +import org.apache.giraph.worker.WorkerObserver; import org.apache.hadoop.conf.Configuration; /** @@ -115,6 +116,17 @@ public class GiraphConfiguration extends Configuration } /** + * Add a WorkerObserver class (optional) + * + * @param workerObserverClass WorkerObserver class to add. + */ + public final void addWorkerObserverClass( + Class<? extends WorkerObserver> workerObserverClass) { + addToClasses(WORKER_OBSERVER_CLASSES, workerObserverClass, + WorkerObserver.class); + } + + /** * Get job observer class * * @return GiraphJobObserver class set. @@ -271,9 +283,7 @@ public class GiraphConfiguration extends Configuration */ public final void setWorkerContextClass( Class<? extends WorkerContext> workerContextClass) { - setClass(WORKER_CONTEXT_CLASS, - workerContextClass, - WorkerContext.class); + setClass(WORKER_CONTEXT_CLASS, workerContextClass, WorkerContext.class); } /** @@ -356,6 +366,15 @@ public class GiraphConfiguration extends Configuration } /** + * Get array of WorkerObserver classes set in configuration. + * + * @return array of WorkerObserver classes. + */ + public Class<? extends WorkerObserver>[] getWorkerObserverClasses() { + return getClassesOfType(WORKER_OBSERVER_CLASSES, WorkerObserver.class); + } + + /** * Whether to track, print, and aggregate metrics. * * @return true if metrics are enabled, false otherwise (default) http://git-wip-us.apache.org/repos/asf/giraph/blob/c1ef8891/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index 51415c2..fb4e8a3 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -27,8 +27,10 @@ public interface GiraphConstants { /** Class for Master - optional */ String MASTER_COMPUTE_CLASS = "giraph.masterComputeClass"; - /** Classes for Observer Master - optional */ + /** Classes for Master Observer - optional */ String MASTER_OBSERVER_CLASSES = "giraph.master.observers"; + /** Classes for Worker Observer - optional */ + String WORKER_OBSERVER_CLASSES = "giraph.worker.observers"; /** Vertex combiner class - optional */ String VERTEX_COMBINER_CLASS = "giraph.combinerClass"; /** Vertex resolver class - optional */ http://git-wip-us.apache.org/repos/asf/giraph/blob/c1ef8891/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java index d75d624..30a7da7 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java @@ -41,6 +41,7 @@ import org.apache.giraph.utils.ExtendedDataOutput; import org.apache.giraph.utils.ReflectionUtils; import org.apache.giraph.utils.UnsafeByteArrayInputStream; import org.apache.giraph.utils.UnsafeByteArrayOutputStream; +import org.apache.giraph.worker.WorkerObserver; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; @@ -417,6 +418,20 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable, } /** + * Create array of WorkerObservers. + * + * @return Instantiated array of WorkerObservers. + */ + public WorkerObserver[] createWorkerObservers() { + Class<? extends WorkerObserver>[] klasses = getWorkerObserverClasses(); + WorkerObserver[] objects = new WorkerObserver[klasses.length]; + for (int i = 0; i < klasses.length; ++i) { + objects[i] = ReflectionUtils.newInstance(klasses[i], this); + } + return objects; + } + + /** * Create job observer * @return GiraphJobObserver set in configuration. */ http://git-wip-us.apache.org/repos/asf/giraph/blob/c1ef8891/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java index 4ede8bb..f7fb7e9 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java @@ -45,6 +45,7 @@ import org.apache.giraph.vertex.Vertex; import org.apache.giraph.worker.BspServiceWorker; import org.apache.giraph.worker.WorkerAggregatorUsage; import org.apache.giraph.worker.WorkerContext; +import org.apache.giraph.worker.WorkerObserver; import org.apache.giraph.zk.ZooKeeperManager; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -252,8 +253,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, context.progress(); graphState = checkSuperstepRestarted( aggregatorUsage, superstep, graphState); - GiraphTimerContext perSuperstepTimer = prepareForSuperstep(graphState); - perSuperstepTimer.stop(); + prepareForSuperstep(graphState); context.progress(); MessageStoreByPartition<I, M> messageStore = serviceWorker.getServerData().getCurrentMessageStore(); @@ -279,10 +279,22 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, LOG.info("execute: BSP application done (global vertices marked done)"); } updateSuperstepGraphState(aggregatorUsage); + postApplication(); + } + + /** + * Handle post-application callbacks. + */ + private void postApplication() { GiraphTimerContext postAppTimerContext = wcPostAppTimer.time(); serviceWorker.getWorkerContext().postApplication(); postAppTimerContext.stop(); context.progress(); + + for (WorkerObserver obs : serviceWorker.getWorkerObservers()) { + obs.postApplication(); + context.progress(); + } } /** @@ -365,15 +377,20 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, * Utility function to prepare various objects managing BSP superstep * operations for the next superstep. * @param graphState graph state metadata object - * @return the timer context for superstep metrics */ - private GiraphTimerContext prepareForSuperstep( - GraphState<I, V, E, M> graphState) { + private void prepareForSuperstep(GraphState<I, V, E, M> graphState) { serviceWorker.prepareSuperstep(); + serviceWorker.getWorkerContext().setGraphState(graphState); - GiraphTimerContext perSuperstepTimer = wcPreSuperstepTimer.time(); + GiraphTimerContext preSuperstepTimer = wcPreSuperstepTimer.time(); serviceWorker.getWorkerContext().preSuperstep(); - return perSuperstepTimer; + preSuperstepTimer.stop(); + context.progress(); + + for (WorkerObserver obs : serviceWorker.getWorkerObservers()) { + obs.preSuperstep(graphState.getSuperstep()); + context.progress(); + } } /** @@ -819,6 +836,11 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, } preAppTimerContext.stop(); context.progress(); + + for (WorkerObserver obs : serviceWorker.getWorkerObservers()) { + obs.preApplication(); + context.progress(); + } } /** http://git-wip-us.apache.org/repos/asf/giraph/blob/c1ef8891/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index 71ea749..e48e01a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -147,6 +147,9 @@ public class BspServiceWorker<I extends WritableComparable, /** Handler for aggregators */ private final WorkerAggregatorHandler aggregatorHandler; + /** array of observers to call back to */ + private final WorkerObserver[] observers; + // Per-Superstep Metrics /** Timer for WorkerContext#postSuperstep */ private GiraphTimer wcPostSuperstepTimer; @@ -191,6 +194,8 @@ public class BspServiceWorker<I extends WritableComparable, aggregatorHandler = new WorkerAggregatorHandler(this, getConfiguration(), context); + observers = getConfiguration().createWorkerObservers(); + GiraphMetrics.get().addSuperstepResetObserver(this); } @@ -208,6 +213,11 @@ public class BspServiceWorker<I extends WritableComparable, } @Override + public WorkerObserver[] getWorkerObservers() { + return observers; + } + + @Override public WorkerClient<I, V, E, M> getWorkerClient() { return workerClient; } @@ -750,11 +760,7 @@ else[HADOOP_NON_SECURE]*/ } if (getSuperstep() != INPUT_SUPERSTEP) { - getWorkerContext().setGraphState(graphState); - GiraphTimerContext timerContext = wcPostSuperstepTimer.time(); - getWorkerContext().postSuperstep(); - timerContext.stop(); - getContext().progress(); + postSuperstepCallbacks(graphState); } aggregatorHandler.finishSuperstep(workerAggregatorRequestProcessor); @@ -801,6 +807,24 @@ else[HADOOP_NON_SECURE]*/ } /** + * Handle post-superstep callbacks + * + * @param graphState GraphState + */ + private void postSuperstepCallbacks(GraphState<I, V, E, M> graphState) { + getWorkerContext().setGraphState(graphState); + GiraphTimerContext timerContext = wcPostSuperstepTimer.time(); + getWorkerContext().postSuperstep(); + timerContext.stop(); + getContext().progress(); + + for (WorkerObserver obs : getWorkerObservers()) { + obs.postSuperstep(graphState.getSuperstep()); + getContext().progress(); + } + } + + /** * Wait for all the requests to finish. */ private void waitForRequestsToFinish() { http://git-wip-us.apache.org/repos/asf/giraph/blob/c1ef8891/giraph-core/src/main/java/org/apache/giraph/worker/DefaultWorkerObserver.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/DefaultWorkerObserver.java b/giraph-core/src/main/java/org/apache/giraph/worker/DefaultWorkerObserver.java new file mode 100644 index 0000000..5c8c94a --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/worker/DefaultWorkerObserver.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.worker; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; + +/** + * Default Observer for Worker that does nothing. + */ +public class DefaultWorkerObserver implements WorkerObserver, + ImmutableClassesGiraphConfigurable { + /** The configuration stored here */ + private ImmutableClassesGiraphConfiguration conf; + + @Override + public void preApplication() { + } + + @Override + public void postApplication() { + } + + @Override + public void preSuperstep(long superstep) { + } + + @Override + public void postSuperstep(long superstep) { + } + + @Override + public ImmutableClassesGiraphConfiguration getConf() { + return conf; + } + + @Override + public void setConf(ImmutableClassesGiraphConfiguration configuration) { + this.conf = configuration; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/c1ef8891/giraph-core/src/main/java/org/apache/giraph/worker/WorkerObserver.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerObserver.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerObserver.java new file mode 100644 index 0000000..fc62629 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerObserver.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.worker; + +/** + * Observer for Worker + */ +public interface WorkerObserver { + /** + * Initialize the WorkerContext. + * This method is executed once on each Worker before the first + * superstep starts. + */ + void preApplication(); + + /** + * Finalize the WorkerContext. + * This method is executed once on each Worker after the last + * superstep ends. + */ + void postApplication(); + + /** + * Execute user code. + * This method is executed once on each Worker before each + * superstep starts. + * + * @param superstep number of superstep + */ + void preSuperstep(long superstep); + + /** + * Execute user code. + * This method is executed once on each Worker after each + * superstep ends. + * + * @param superstep number of superstep + */ + void postSuperstep(long superstep); +}
