Repository: giraph Updated Branches: refs/heads/trunk 608d50697 -> 9b6d6f9f6
GIRAPH-1065: Allow extending JobProgressTrackerService Summary: We might want to perform additional actions on events from JobProgressTrackerService. Allow overriding it and specifying another class to use. Test Plan: Ran a job with custom JobProgressTrackerService and verify actions on it are called Differential Revision: https://reviews.facebook.net/D58383 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/9b6d6f9f Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/9b6d6f9f Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/9b6d6f9f Branch: refs/heads/trunk Commit: 9b6d6f9f6f91c5d6a8a153635f7e5f3c52a1f4f2 Parents: 608d506 Author: Maja Kabiljo <[email protected]> Authored: Wed May 18 09:27:06 2016 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Thu May 19 02:01:26 2016 -0700 ---------------------------------------------------------------------- .gitignore | 3 + .../org/apache/giraph/conf/GiraphConstants.java | 9 + .../job/DefaultJobProgressTrackerService.java | 217 +++++++++++++++++++ .../java/org/apache/giraph/job/GiraphJob.java | 2 +- .../giraph/job/JobProgressTrackerService.java | 189 +--------------- src/site/xdoc/options.xml | 6 + 6 files changed, 244 insertions(+), 182 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/9b6d6f9f/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 42ecd00..3ae52e2 100644 --- a/.gitignore +++ b/.gitignore @@ -31,3 +31,6 @@ failed-profile.txt /for-each-profile-results.txt /giraph-hive/derby.log + +# File with all giraph conf options +/src/site/xdoc/options.xml http://git-wip-us.apache.org/repos/asf/giraph/blob/9b6d6f9f/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index 15eca3c..b5bb9ed 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -60,9 +60,11 @@ import org.apache.giraph.io.filters.EdgeInputFilter; import org.apache.giraph.io.filters.VertexInputFilter; import org.apache.giraph.job.DefaultGiraphJobRetryChecker; import org.apache.giraph.job.DefaultJobObserver; +import org.apache.giraph.job.DefaultJobProgressTrackerService; import org.apache.giraph.job.GiraphJobObserver; import org.apache.giraph.job.GiraphJobRetryChecker; import org.apache.giraph.job.HaltApplicationUtils; +import org.apache.giraph.job.JobProgressTrackerService; import org.apache.giraph.mapping.MappingStore; import org.apache.giraph.mapping.MappingStoreOps; import org.apache.giraph.mapping.translate.TranslateEdge; @@ -1170,6 +1172,13 @@ public interface GiraphConstants { new BooleanConfOption("giraph.trackJobProgressOnClient", false, "Whether to track job progress on client or not"); + /** Class to use to track job progress on client */ + ClassConfOption<JobProgressTrackerService> JOB_PROGRESS_TRACKER_CLASS = + ClassConfOption.create("giraph.jobProgressTrackerClass", + DefaultJobProgressTrackerService.class, + JobProgressTrackerService.class, + "Class to use to track job progress on client"); + /** Number of retries for creating the HDFS files */ IntConfOption HDFS_FILE_CREATION_RETRIES = new IntConfOption("giraph.hdfs.file.creation.retries", 10, http://git-wip-us.apache.org/repos/asf/giraph/blob/9b6d6f9f/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java b/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java new file mode 100644 index 0000000..adca42b --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.job; + +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.worker.WorkerProgress; +import org.apache.hadoop.mapreduce.Job; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Default implementation of JobProgressTrackerService + */ +public class DefaultJobProgressTrackerService + implements JobProgressTrackerService { + /** Class logger */ + private static final Logger LOG = + Logger.getLogger(JobProgressTrackerService.class); + /** How often to print job's progress */ + private static final int UPDATE_MILLISECONDS = 10 * 1000; + + /** Configuration */ + private GiraphConfiguration conf; + /** Giraph job callback */ + private GiraphJobObserver jobObserver; + /** Thread which periodically writes job's progress */ + private Thread writerThread; + /** Whether application is finished */ + private volatile boolean finished = false; + /** Number of mappers which the job got */ + private int mappersStarted; + /** Last time number of mappers started was logged */ + private long lastTimeMappersStartedLogged; + /** Map of worker progresses */ + private final Map<Integer, WorkerProgress> workerProgresses = + new ConcurrentHashMap<>(); + /** Job */ + private Job job; + + @Override + public void init(GiraphConfiguration conf, GiraphJobObserver jobObserver) { + this.conf = conf; + this.jobObserver = jobObserver; + + if (LOG.isInfoEnabled()) { + LOG.info("Waiting for job to start... (this may take a minute)"); + } + startWriterThread(); + } + + /** + * Start the thread which writes progress periodically + */ + private void startWriterThread() { + writerThread = new Thread(new Runnable() { + @Override + public void run() { + while (!finished) { + if (mappersStarted == conf.getMaxWorkers() + 1 && + !workerProgresses.isEmpty()) { + // Combine and log + CombinedWorkerProgress combinedWorkerProgress = + new CombinedWorkerProgress(workerProgresses.values(), conf); + if (LOG.isInfoEnabled()) { + LOG.info(combinedWorkerProgress.toString()); + } + // Check if application is done + if (combinedWorkerProgress.isDone(conf.getMaxWorkers())) { + break; + } + } + try { + Thread.sleep(UPDATE_MILLISECONDS); + } catch (InterruptedException e) { + if (LOG.isInfoEnabled()) { + LOG.info("Progress thread interrupted"); + } + break; + } + } + } + }); + writerThread.setDaemon(true); + writerThread.start(); + } + + @Override + public void setJob(Job job) { + this.job = job; + } + + /** + * Called when job got all mappers, used to check MAX_ALLOWED_JOB_TIME_MS + * and potentially start a thread which will kill the job after this time + */ + private void jobGotAllMappers() { + jobObserver.jobGotAllMappers(job); + final long maxAllowedJobTimeMs = + GiraphConstants.MAX_ALLOWED_JOB_TIME_MS.get(conf); + if (maxAllowedJobTimeMs > 0) { + // Start a thread which will kill the job if running for too long + Thread killThread = new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(maxAllowedJobTimeMs); + try { + LOG.warn("Killing job because it took longer than " + + maxAllowedJobTimeMs + " milliseconds"); + job.killJob(); + } catch (IOException e) { + LOG.warn("Failed to kill job", e); + } + } catch (InterruptedException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Thread checking for jobs max allowed time " + + "interrupted"); + } + } + } + }); + killThread.setDaemon(true); + killThread.start(); + } + } + + @Override + public synchronized void mapperStarted() { + mappersStarted++; + if (LOG.isInfoEnabled()) { + if (mappersStarted == conf.getMaxWorkers() + 1) { + LOG.info("Got all " + mappersStarted + " mappers"); + jobGotAllMappers(); + } else { + if (System.currentTimeMillis() - lastTimeMappersStartedLogged > + UPDATE_MILLISECONDS) { + lastTimeMappersStartedLogged = System.currentTimeMillis(); + LOG.info("Got " + mappersStarted + " but needs " + + (conf.getMaxWorkers() + 1) + " mappers"); + } + } + } + } + + @Override + public void logInfo(String logLine) { + if (LOG.isInfoEnabled()) { + LOG.info(logLine); + } + } + + @Override + public void logError(String logLine) { + LOG.error(logLine); + } + + @Override + public void logFailure(String reason) { + LOG.fatal(reason); + finished = true; + writerThread.interrupt(); + } + + @Override + public void updateProgress(WorkerProgress workerProgress) { + workerProgresses.put(workerProgress.getTaskId(), workerProgress); + } + + @Override + public void stop(boolean succeeded) { + finished = true; + writerThread.interrupt(); + if (LOG.isInfoEnabled()) { + LOG.info("Job " + (succeeded ? "finished successfully" : "failed") + + ", cleaning up..."); + } + } + + /** + * Create job progress server on job client if enabled in configuration. + * + * @param conf Configuration + * @param jobObserver Giraph job callbacks + * @return JobProgressTrackerService + */ + public static JobProgressTrackerService createJobProgressTrackerService( + GiraphConfiguration conf, GiraphJobObserver jobObserver) { + if (!conf.trackJobProgressOnClient()) { + return null; + } + + JobProgressTrackerService jobProgressTrackerService = + GiraphConstants.JOB_PROGRESS_TRACKER_CLASS.newInstance(conf); + jobProgressTrackerService.init(conf, jobObserver); + return jobProgressTrackerService; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/9b6d6f9f/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java index 90a73c6..b11f131 100644 --- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java +++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java @@ -241,7 +241,7 @@ public class GiraphJob { GiraphJobObserver jobObserver = conf.getJobObserver(); JobProgressTrackerService jobProgressTrackerService = - JobProgressTrackerService.createJobProgressTrackerService( + DefaultJobProgressTrackerService.createJobProgressTrackerService( conf, jobObserver); ClientThriftServer clientThriftServer = null; if (jobProgressTrackerService != null) { http://git-wip-us.apache.org/repos/asf/giraph/blob/9b6d6f9f/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java index c0189c0..e4b2b66 100644 --- a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java +++ b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java @@ -19,204 +19,31 @@ package org.apache.giraph.job; import org.apache.giraph.conf.GiraphConfiguration; -import org.apache.giraph.conf.GiraphConstants; -import org.apache.giraph.worker.WorkerProgress; import org.apache.hadoop.mapreduce.Job; -import org.apache.log4j.Logger; - -import java.io.IOException; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; /** * Implementation of job progress tracker service on job client */ -public class JobProgressTrackerService implements JobProgressTracker { - /** Class logger */ - private static final Logger LOG = - Logger.getLogger(JobProgressTrackerService.class); - /** How often to print job's progress */ - private static final int UPDATE_MILLISECONDS = 10 * 1000; - - /** Configuration */ - private final GiraphConfiguration conf; - /** Giraph job callback */ - private final GiraphJobObserver jobObserver; - /** Thread which periodically writes job's progress */ - private Thread writerThread; - /** Whether application is finished */ - private volatile boolean finished = false; - /** Number of mappers which the job got */ - private int mappersStarted; - /** Last time number of mappers started was logged */ - private long lastTimeMappersStartedLogged; - /** Map of worker progresses */ - private final Map<Integer, WorkerProgress> workerProgresses = - new ConcurrentHashMap<>(); - /** Job */ - private Job job; - +public interface JobProgressTrackerService extends JobProgressTracker { /** - * Constructor + * Initialize the service * * @param conf Configuration * @param jobObserver Giraph job callbacks */ - public JobProgressTrackerService(GiraphConfiguration conf, - GiraphJobObserver jobObserver) { - this.conf = conf; - this.jobObserver = jobObserver; - - if (LOG.isInfoEnabled()) { - LOG.info("Waiting for job to start... (this may take a minute)"); - } - startWriterThread(); - } + void init(GiraphConfiguration conf, GiraphJobObserver jobObserver); /** - * Start the thread which writes progress periodically + * Set job + * + * @param job Job */ - private void startWriterThread() { - writerThread = new Thread(new Runnable() { - @Override - public void run() { - while (!finished) { - if (mappersStarted == conf.getMaxWorkers() + 1 && - !workerProgresses.isEmpty()) { - // Combine and log - CombinedWorkerProgress combinedWorkerProgress = - new CombinedWorkerProgress(workerProgresses.values(), conf); - if (LOG.isInfoEnabled()) { - LOG.info(combinedWorkerProgress.toString()); - } - // Check if application is done - if (combinedWorkerProgress.isDone(conf.getMaxWorkers())) { - break; - } - } - try { - Thread.sleep(UPDATE_MILLISECONDS); - } catch (InterruptedException e) { - if (LOG.isInfoEnabled()) { - LOG.info("Progress thread interrupted"); - } - break; - } - } - } - }); - writerThread.setDaemon(true); - writerThread.start(); - } - - public void setJob(Job job) { - this.job = job; - } - - /** - * Called when job got all mappers, used to check MAX_ALLOWED_JOB_TIME_MS - * and potentially start a thread which will kill the job after this time - */ - private void jobGotAllMappers() { - jobObserver.jobGotAllMappers(job); - final long maxAllowedJobTimeMs = - GiraphConstants.MAX_ALLOWED_JOB_TIME_MS.get(conf); - if (maxAllowedJobTimeMs > 0) { - // Start a thread which will kill the job if running for too long - Thread killThread = new Thread(new Runnable() { - @Override - public void run() { - try { - Thread.sleep(maxAllowedJobTimeMs); - try { - LOG.warn("Killing job because it took longer than " + - maxAllowedJobTimeMs + " milliseconds"); - job.killJob(); - } catch (IOException e) { - LOG.warn("Failed to kill job", e); - } - } catch (InterruptedException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Thread checking for jobs max allowed time " + - "interrupted"); - } - } - } - }); - killThread.setDaemon(true); - killThread.start(); - } - } - - @Override - public synchronized void mapperStarted() { - mappersStarted++; - if (LOG.isInfoEnabled()) { - if (mappersStarted == conf.getMaxWorkers() + 1) { - LOG.info("Got all " + mappersStarted + " mappers"); - jobGotAllMappers(); - } else { - if (System.currentTimeMillis() - lastTimeMappersStartedLogged > - UPDATE_MILLISECONDS) { - lastTimeMappersStartedLogged = System.currentTimeMillis(); - LOG.info("Got " + mappersStarted + " but needs " + - (conf.getMaxWorkers() + 1) + " mappers"); - } - } - } - } - - @Override - public void logInfo(String logLine) { - if (LOG.isInfoEnabled()) { - LOG.info(logLine); - } - } - - @Override - public void logError(String logLine) { - LOG.error(logLine); - } - - @Override - public void logFailure(String reason) { - LOG.fatal(reason); - finished = true; - writerThread.interrupt(); - } - - @Override - public void updateProgress(WorkerProgress workerProgress) { - workerProgresses.put(workerProgress.getTaskId(), workerProgress); - } + void setJob(Job job); /** * Stop the thread which logs application progress and server * * @param succeeded Whether job succeeded or not */ - public void stop(boolean succeeded) { - finished = true; - writerThread.interrupt(); - if (LOG.isInfoEnabled()) { - LOG.info("Job " + (succeeded ? "finished successfully" : "failed") + - ", cleaning up..."); - } - } - - /** - * Create job progress server on job client if enabled in configuration. - * - * @param conf Configuration - * @param jobObserver Giraph job callbacks - * @return JobProgressTrackerService - */ - public static JobProgressTrackerService createJobProgressTrackerService( - GiraphConfiguration conf, GiraphJobObserver jobObserver) { - if (!conf.trackJobProgressOnClient()) { - return null; - } - - return new JobProgressTrackerService(conf, jobObserver); - } + void stop(boolean succeeded); } http://git-wip-us.apache.org/repos/asf/giraph/blob/9b6d6f9f/src/site/xdoc/options.xml ---------------------------------------------------------------------- diff --git a/src/site/xdoc/options.xml b/src/site/xdoc/options.xml index 2735575..55638ae 100644 --- a/src/site/xdoc/options.xml +++ b/src/site/xdoc/options.xml @@ -376,6 +376,12 @@ under the License. <td>Observer class to watch over job status - optional</td> </tr> <tr> + <td>giraph.jobProgressTrackerClass</td> + <td>class</td> + <td>DefaultJobProgressTrackerService</td> + <td>Class to use to track job progress on client</td> + </tr> + <tr> <td>giraph.jobRetryCheckerClass</td> <td>class</td> <td>DefaultGiraphJobRetryChecker</td>
