Repository: giraph Updated Branches: refs/heads/trunk f183402ea -> 4a188d8d1
[GIRAPH-1031] Adding onAllMappersStarted callback Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/4a188d8d Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/4a188d8d Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/4a188d8d Branch: refs/heads/trunk Commit: 4a188d8d13e44b397bb40ce1280279d0c9b1feb1 Parents: f183402 Author: Sergey Edunov <[email protected]> Authored: Fri Sep 18 17:27:20 2015 -0700 Committer: Sergey Edunov <[email protected]> Committed: Tue Sep 29 10:16:30 2015 -0700 ---------------------------------------------------------------------- .../org/apache/giraph/job/DefaultJobObserver.java | 5 +++++ .../main/java/org/apache/giraph/job/GiraphJob.java | 5 +++-- .../org/apache/giraph/job/GiraphJobObserver.java | 9 +++++++++ .../apache/giraph/job/JobProgressTrackerService.java | 15 ++++++++++++--- 4 files changed, 29 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/4a188d8d/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobObserver.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobObserver.java b/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobObserver.java index ca331b2..54af5c1 100644 --- a/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobObserver.java +++ b/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobObserver.java @@ -54,4 +54,9 @@ public class DefaultJobObserver implements GiraphJobObserver, public void jobFinished(Job jobToSubmit, boolean passed) { // do nothing } + + @Override + public void jobGotAllMappers(Job job) { + // do nothing + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/4a188d8d/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java index 62894b6..8792e59 100644 --- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java +++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java @@ -237,8 +237,10 @@ public class GiraphJob { int tryCount = 0; GiraphJobRetryChecker retryChecker = conf.getJobRetryChecker(); while (true) { + GiraphJobObserver jobObserver = conf.getJobObserver(); + JobProgressTrackerService jobProgressTrackerService = - JobProgressTrackerService.createJobProgressServer(conf); + JobProgressTrackerService.createJobProgressServer(conf, jobObserver); tryCount++; Job submittedJob = new Job(conf, jobName); @@ -254,7 +256,6 @@ public class GiraphJob { jobProgressTrackerService.setJob(submittedJob); } - GiraphJobObserver jobObserver = conf.getJobObserver(); jobObserver.launchingJob(submittedJob); submittedJob.submit(); if (LOG.isInfoEnabled()) { http://git-wip-us.apache.org/repos/asf/giraph/blob/4a188d8d/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobObserver.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobObserver.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobObserver.java index 3905f77..1cc90a6 100644 --- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobObserver.java +++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobObserver.java @@ -43,4 +43,13 @@ public interface GiraphJobObserver { * @param passed true if job succeeded. */ void jobFinished(Job submittedJob, boolean passed); + + /** + * Called when job gets all mappers and + * really starts computations. + * May not get called if the job progress tracker + * fails. + * @param job job that runs + */ + void jobGotAllMappers(Job job); } http://git-wip-us.apache.org/repos/asf/giraph/blob/4a188d8d/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java index 064ed5b..6949feb 100644 --- a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java +++ b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java @@ -48,6 +48,8 @@ public class JobProgressTrackerService implements JobProgressTracker { /** Configuration */ private final GiraphConfiguration conf; + /** Giraph job callback */ + private final GiraphJobObserver jobObserver; /** Thread which periodically writes job's progress */ private Thread writerThread; /** Whether application is finished */ @@ -68,9 +70,13 @@ public class JobProgressTrackerService implements JobProgressTracker { * Constructor * * @param conf Configuration + * @param jobObserver Giraph job callbacks */ - public JobProgressTrackerService(GiraphConfiguration conf) { + public JobProgressTrackerService(GiraphConfiguration conf, + GiraphJobObserver jobObserver) { this.conf = conf; + this.jobObserver = jobObserver; + if (LOG.isInfoEnabled()) { LOG.info("Waiting for job to start... (this may take a minute)"); } @@ -121,6 +127,7 @@ public class JobProgressTrackerService implements JobProgressTracker { * and potentially start a thread which will kill the job after this time */ private void jobGotAllMappers() { + jobObserver.jobGotAllMappers(job); final long maxAllowedJobTimeMs = GiraphConstants.MAX_ALLOWED_JOB_TIME_MS.get(conf); if (maxAllowedJobTimeMs > 0) { @@ -206,15 +213,17 @@ public class JobProgressTrackerService implements JobProgressTracker { * null if progress shouldn't be tracked * * @param conf Configuration + * @param jobObserver Giraph job callbacks * @return JobProgressTrackerService */ public static JobProgressTrackerService createJobProgressServer( - GiraphConfiguration conf) { + GiraphConfiguration conf, GiraphJobObserver jobObserver) { if (!conf.trackJobProgressOnClient()) { return null; } try { - JobProgressTrackerService service = new JobProgressTrackerService(conf); + JobProgressTrackerService service = + new JobProgressTrackerService(conf, jobObserver); ThriftServiceProcessor processor = new ThriftServiceProcessor(new ThriftCodecManager(), new ArrayList<ThriftEventHandler>(), service);
