Repository: giraph Updated Branches: refs/heads/trunk da3c7b2d6 -> 61db68912
GIRAPH-952: Limit job runtime Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/61db6891 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/61db6891 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/61db6891 Branch: refs/heads/trunk Commit: 61db689128679b886753fed0b8f310b0ece9e0cf Parents: da3c7b2 Author: Maja Kabiljo <[email protected]> Authored: Thu Oct 2 15:28:23 2014 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Fri Oct 3 09:38:22 2014 -0700 ---------------------------------------------------------------------- .../org/apache/giraph/conf/GiraphConstants.java | 10 +++++ .../java/org/apache/giraph/job/GiraphJob.java | 3 ++ .../giraph/job/JobProgressTrackerService.java | 42 ++++++++++++++++++++ 3 files changed, 55 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/61db6891/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index d1fdf57..e78eb42 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -251,6 +251,16 @@ public interface GiraphConstants { "Class which decides whether a failed job should be retried - " + "optional"); + /** + * Maximum allowed time for job to run after getting all resources before it + * will be killed, in milliseconds (-1 if it has no limit) + */ + LongConfOption MAX_ALLOWED_JOB_TIME_MS = + new LongConfOption("giraph.maxAllowedJobTimeMilliseconds", -1, + "Maximum allowed time for job to run after getting all resources " + + "before it will be killed, in milliseconds " + + "(-1 if it has no limit)"); + // At least one of the input format classes is required. /** VertexInputFormat class */ ClassConfOption<VertexInputFormat> VERTEX_INPUT_FORMAT_CLASS = http://git-wip-us.apache.org/repos/asf/giraph/blob/61db6891/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java index 491d3d2..ca1ad1c 100644 --- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java +++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java @@ -254,6 +254,9 @@ public class GiraphJob { submittedJob.setMapperClass(GraphMapper.class); submittedJob.setInputFormatClass(BspInputFormat.class); submittedJob.setOutputFormatClass(BspOutputFormat.class); + if (jobProgressTrackerService != null) { + jobProgressTrackerService.setJob(submittedJob); + } GiraphJobObserver jobObserver = conf.getJobObserver(); jobObserver.launchingJob(submittedJob); http://git-wip-us.apache.org/repos/asf/giraph/blob/61db6891/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java index 3a896e2..49610de 100644 --- a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java +++ b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java @@ -19,7 +19,9 @@ package org.apache.giraph.job; import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.worker.WorkerProgress; +import org.apache.hadoop.mapreduce.Job; import org.apache.log4j.Logger; import com.facebook.swift.codec.ThriftCodecManager; @@ -28,6 +30,7 @@ import com.facebook.swift.service.ThriftServer; import com.facebook.swift.service.ThriftServerConfig; import com.facebook.swift.service.ThriftServiceProcessor; +import java.io.IOException; import java.net.InetAddress; import java.util.ArrayList; import java.util.Map; @@ -58,6 +61,8 @@ public class JobProgressTrackerService implements JobProgressTracker { /** Map of worker progresses */ private final Map<Integer, WorkerProgress> workerProgresses = new ConcurrentHashMap<>(); + /** Job */ + private Job job; /** * Constructor @@ -107,12 +112,49 @@ public class JobProgressTrackerService implements JobProgressTracker { writerThread.start(); } + public void setJob(Job job) { + this.job = job; + } + + /** + * Called when job got all mappers, used to check MAX_ALLOWED_JOB_TIME_MS + * and potentially start a thread which will kill the job after this time + */ + private void jobGotAllMappers() { + final long maxAllowedJobTimeMs = + GiraphConstants.MAX_ALLOWED_JOB_TIME_MS.get(conf); + if (maxAllowedJobTimeMs > 0) { + // Start a thread which will kill the job if running for too long + new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(maxAllowedJobTimeMs); + try { + LOG.warn("Killing job because it took longer than " + + maxAllowedJobTimeMs + " milliseconds"); + job.killJob(); + } catch (IOException e) { + LOG.warn("Failed to kill job", e); + } + } catch (InterruptedException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Thread checking for jobs max allowed time " + + "interrupted"); + } + } + } + }).start(); + } + } + @Override public synchronized void mapperStarted() { mappersStarted++; if (LOG.isInfoEnabled()) { if (mappersStarted == conf.getMaxWorkers() + 1) { LOG.info("Got all " + mappersStarted + " mappers"); + jobGotAllMappers(); } else { if (System.currentTimeMillis() - lastTimeMappersStartedLogged > UPDATE_MILLISECONDS) {
