Repository: giraph Updated Branches: refs/heads/trunk d455270e2 -> fc2677348
GIRAPH-893: Implement preLoad & postSave on workerObservers (pavanka) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/fc267734 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/fc267734 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/fc267734 Branch: refs/heads/trunk Commit: fc267734887dddd7c379144d6d5499fe3d541db8 Parents: d455270 Author: Pavan Kumar <[email protected]> Authored: Tue Aug 5 14:35:04 2014 -0700 Committer: Pavan Kumar <[email protected]> Committed: Tue Aug 5 14:35:32 2014 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 + .../apache/giraph/graph/GraphTaskManager.java | 22 ++++++++++ .../apache/giraph/utils/JMapHistoDumper.java | 12 +++++- .../org/apache/giraph/utils/LogVersions.java | 6 +++ .../giraph/utils/ReactiveJMapHistoDumper.java | 12 +++++- .../giraph/worker/DefaultWorkerObserver.java | 16 +++++-- .../apache/giraph/worker/WorkerObserver.java | 44 +++++++++++++------- 7 files changed, 90 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/fc267734/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 3c8e155..300215a 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-893: Implement preLoad & postSave on workerObservers (pavanka) + GIRAPH-936: AsyncMessageStoreWrapper threads are not daemonized (edunov via majakabiljo) GIRAPH-934: Allow having state in aggregators (ikabiljo via majakabiljo) http://git-wip-us.apache.org/repos/asf/giraph/blob/fc267734/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java index 684f4eb..6ebb002 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java @@ -255,6 +255,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, if (checkTaskState()) { return; } + preLoadOnWorkerObservers(); finishedSuperstepStats = serviceWorker.setup(); if (collectInputSuperstepStats(finishedSuperstepStats)) { return; @@ -830,6 +831,26 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, } /** + * Executes preLoad() on worker observers. + */ + private void preLoadOnWorkerObservers() { + for (WorkerObserver obs : serviceWorker.getWorkerObservers()) { + obs.preLoad(); + context.progress(); + } + } + + /** + * Executes postSave() on worker observers. + */ + private void postSaveOnWorkerObservers() { + for (WorkerObserver obs : serviceWorker.getWorkerObservers()) { + obs.postSave(); + context.progress(); + } + } + + /** * Called by owner of this GraphTaskManager object on each compute node */ public void cleanup() @@ -843,6 +864,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, if (serviceWorker != null) { serviceWorker.cleanup(finishedSuperstepStats); + postSaveOnWorkerObservers(); } try { if (masterThread != null) { http://git-wip-us.apache.org/repos/asf/giraph/blob/fc267734/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java b/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java index 3bcf42e..f90337f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java @@ -45,17 +45,25 @@ public class JMapHistoDumper implements MasterObserver, WorkerObserver { private boolean stop = false; @Override - public void preApplication() { + public void preLoad() { // This is called by both WorkerObserver and MasterObserver startJMapThread(); } @Override - public void postApplication() { + public void postSave() { // This is called by both WorkerObserver and MasterObserver joinJMapThread(); } + @Override + public void preApplication() { + } + + @Override + public void postApplication() { + } + /** * Join the jmap thread */ http://git-wip-us.apache.org/repos/asf/giraph/blob/fc267734/giraph-core/src/main/java/org/apache/giraph/utils/LogVersions.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/LogVersions.java b/giraph-core/src/main/java/org/apache/giraph/utils/LogVersions.java index 8305df7..5bdad87 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/LogVersions.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/LogVersions.java @@ -37,6 +37,12 @@ public class LogVersions implements WorkerObserver, MasterObserver { } @Override + public void preLoad() { } + + @Override + public void postSave() { } + + @Override public void preApplication() { GiraphDepVersions.get().logVersionsUsed(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/fc267734/giraph-core/src/main/java/org/apache/giraph/utils/ReactiveJMapHistoDumper.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ReactiveJMapHistoDumper.java b/giraph-core/src/main/java/org/apache/giraph/utils/ReactiveJMapHistoDumper.java index 68369d9..844f929 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/ReactiveJMapHistoDumper.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/ReactiveJMapHistoDumper.java @@ -52,17 +52,25 @@ public class ReactiveJMapHistoDumper extends private volatile boolean stop = false; @Override - public void preApplication() { + public void preLoad() { // This is called by both WorkerObserver and MasterObserver startSupervisorThread(); } @Override - public void postApplication() { + public void postSave() { // This is called by both WorkerObserver and MasterObserver joinSupervisorThread(); } + @Override + public void preApplication() { + } + + @Override + public void postApplication() { + } + /** * Join the supervisor thread */ http://git-wip-us.apache.org/repos/asf/giraph/blob/fc267734/giraph-core/src/main/java/org/apache/giraph/worker/DefaultWorkerObserver.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/DefaultWorkerObserver.java b/giraph-core/src/main/java/org/apache/giraph/worker/DefaultWorkerObserver.java index 5c8c94a..694c4ed 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/DefaultWorkerObserver.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/DefaultWorkerObserver.java @@ -18,18 +18,26 @@ package org.apache.giraph.worker; -import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; /** * Default Observer for Worker that does nothing. */ -public class DefaultWorkerObserver implements WorkerObserver, - ImmutableClassesGiraphConfigurable { - /** The configuration stored here */ +public class DefaultWorkerObserver implements WorkerObserver { + /** + * The configuration stored here + */ private ImmutableClassesGiraphConfiguration conf; @Override + public void preLoad() { + } + + @Override + public void postSave() { + } + + @Override public void preApplication() { } http://git-wip-us.apache.org/repos/asf/giraph/blob/fc267734/giraph-core/src/main/java/org/apache/giraph/worker/WorkerObserver.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerObserver.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerObserver.java index fc62629..b1b40db 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerObserver.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerObserver.java @@ -18,39 +18,51 @@ package org.apache.giraph.worker; +import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable; + /** - * Observer for Worker + * Observer for worker. The user can subclass and register an observer with the + * Giraph framework. The framework will execute methods of the observer at + * designated moments of computation on each worker. */ -public interface WorkerObserver { +public interface WorkerObserver extends ImmutableClassesGiraphConfigurable { /** - * Initialize the WorkerContext. - * This method is executed once on each Worker before the first - * superstep starts. + * Initialize the observer. This method is executed once on each worker before + * loading. */ - void preApplication(); + void preLoad(); /** - * Finalize the WorkerContext. - * This method is executed once on each Worker after the last - * superstep ends. + * Initialize the observer. This method is executed once on each worker after + * loading before the first superstep starts. */ - void postApplication(); + void preApplication(); /** - * Execute user code. - * This method is executed once on each Worker before each - * superstep starts. + * Execute the observer. This method is executed once on each worker before + * each superstep starts. * * @param superstep number of superstep */ void preSuperstep(long superstep); /** - * Execute user code. - * This method is executed once on each Worker after each - * superstep ends. + * Execute the observer. This method is executed once on each worker after + * each superstep ends. * * @param superstep number of superstep */ void postSuperstep(long superstep); + + /** + * Finalize the observer. This method is executed once on each worker after + * the last superstep ends before saving. + */ + void postApplication(); + + /** + * Finalize the observer. This method is executed once on each worker after + * saving. + */ + void postSave(); }
