Repository: giraph
Updated Branches:
  refs/heads/trunk f183402ea -> 4a188d8d1


[GIRAPH-1031] Adding onAllMappersStarted callback


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/4a188d8d
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/4a188d8d
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/4a188d8d

Branch: refs/heads/trunk
Commit: 4a188d8d13e44b397bb40ce1280279d0c9b1feb1
Parents: f183402
Author: Sergey Edunov <[email protected]>
Authored: Fri Sep 18 17:27:20 2015 -0700
Committer: Sergey Edunov <[email protected]>
Committed: Tue Sep 29 10:16:30 2015 -0700

----------------------------------------------------------------------
 .../org/apache/giraph/job/DefaultJobObserver.java    |  5 +++++
 .../main/java/org/apache/giraph/job/GiraphJob.java   |  5 +++--
 .../org/apache/giraph/job/GiraphJobObserver.java     |  9 +++++++++
 .../apache/giraph/job/JobProgressTrackerService.java | 15 ++++++++++++---
 4 files changed, 29 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/4a188d8d/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobObserver.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobObserver.java 
b/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobObserver.java
index ca331b2..54af5c1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobObserver.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobObserver.java
@@ -54,4 +54,9 @@ public class DefaultJobObserver implements GiraphJobObserver,
   public void jobFinished(Job jobToSubmit, boolean passed) {
     // do nothing
   }
+
+  @Override
+  public void jobGotAllMappers(Job job) {
+    // do nothing
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a188d8d/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java 
b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
index 62894b6..8792e59 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
@@ -237,8 +237,10 @@ public class GiraphJob {
     int tryCount = 0;
     GiraphJobRetryChecker retryChecker = conf.getJobRetryChecker();
     while (true) {
+      GiraphJobObserver jobObserver = conf.getJobObserver();
+
       JobProgressTrackerService jobProgressTrackerService =
-          JobProgressTrackerService.createJobProgressServer(conf);
+          JobProgressTrackerService.createJobProgressServer(conf, jobObserver);
 
       tryCount++;
       Job submittedJob = new Job(conf, jobName);
@@ -254,7 +256,6 @@ public class GiraphJob {
         jobProgressTrackerService.setJob(submittedJob);
       }
 
-      GiraphJobObserver jobObserver = conf.getJobObserver();
       jobObserver.launchingJob(submittedJob);
       submittedJob.submit();
       if (LOG.isInfoEnabled()) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a188d8d/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobObserver.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobObserver.java 
b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobObserver.java
index 3905f77..1cc90a6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobObserver.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobObserver.java
@@ -43,4 +43,13 @@ public interface GiraphJobObserver {
    * @param passed true if job succeeded.
    */
   void jobFinished(Job submittedJob, boolean passed);
+
+  /**
+   * Called when job gets all mappers and
+   * really starts computations.
+   * May not get called if the job progress tracker
+   * fails.
+   * @param job job that runs
+   */
+  void jobGotAllMappers(Job job);
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a188d8d/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java
 
b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java
index 064ed5b..6949feb 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java
@@ -48,6 +48,8 @@ public class JobProgressTrackerService implements 
JobProgressTracker {
 
   /** Configuration */
   private final GiraphConfiguration conf;
+  /** Giraph job callback */
+  private final GiraphJobObserver jobObserver;
   /** Thread which periodically writes job's progress */
   private Thread writerThread;
   /** Whether application is finished */
@@ -68,9 +70,13 @@ public class JobProgressTrackerService implements 
JobProgressTracker {
    * Constructor
    *
    * @param conf Configuration
+   * @param jobObserver Giraph job callbacks
    */
-  public JobProgressTrackerService(GiraphConfiguration conf) {
+  public JobProgressTrackerService(GiraphConfiguration conf,
+                                   GiraphJobObserver jobObserver) {
     this.conf = conf;
+    this.jobObserver = jobObserver;
+
     if (LOG.isInfoEnabled()) {
       LOG.info("Waiting for job to start... (this may take a minute)");
     }
@@ -121,6 +127,7 @@ public class JobProgressTrackerService implements 
JobProgressTracker {
    * and potentially start a thread which will kill the job after this time
    */
   private void jobGotAllMappers() {
+    jobObserver.jobGotAllMappers(job);
     final long maxAllowedJobTimeMs =
         GiraphConstants.MAX_ALLOWED_JOB_TIME_MS.get(conf);
     if (maxAllowedJobTimeMs > 0) {
@@ -206,15 +213,17 @@ public class JobProgressTrackerService implements 
JobProgressTracker {
    * null if progress shouldn't be tracked
    *
    * @param conf Configuration
+   * @param jobObserver Giraph job callbacks
    * @return JobProgressTrackerService
    */
   public static JobProgressTrackerService createJobProgressServer(
-      GiraphConfiguration conf) {
+      GiraphConfiguration conf, GiraphJobObserver jobObserver) {
     if (!conf.trackJobProgressOnClient()) {
       return null;
     }
     try {
-      JobProgressTrackerService service = new JobProgressTrackerService(conf);
+      JobProgressTrackerService service =
+          new JobProgressTrackerService(conf, jobObserver);
       ThriftServiceProcessor processor =
           new ThriftServiceProcessor(new ThriftCodecManager(),
               new ArrayList<ThriftEventHandler>(), service);

Reply via email to