Updated Branches: refs/heads/trunk 909d4c3cc -> f2b28e14b
GIRAPH-790: Add a way to automatically retry a job (majakabiljo) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/f2b28e14 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/f2b28e14 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/f2b28e14 Branch: refs/heads/trunk Commit: f2b28e14b6a8b70c25e86e9823e7ed3f64aaf459 Parents: 909d4c3 Author: Maja Kabiljo <[email protected]> Authored: Fri Nov 1 14:58:10 2013 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Fri Nov 1 14:59:08 2013 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 + .../apache/giraph/conf/GiraphConfiguration.java | 20 ++++++++ .../org/apache/giraph/conf/GiraphConstants.java | 9 ++++ .../ImmutableClassesGiraphConfiguration.java | 10 ++++ .../job/DefaultGiraphJobRetryChecker.java | 33 ++++++++++++++ .../java/org/apache/giraph/job/GiraphJob.java | 48 ++++++++++++-------- .../giraph/job/GiraphJobRetryChecker.java | 36 +++++++++++++++ 7 files changed, 139 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/f2b28e14/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 4465ac1..7df5b3e 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-790: Add a way to automatically retry a job (majakabiljo) + GIRAPH-789: Upgrade hive-io to 0.20 - less metastore accesses (majakabiljo) GIRAPH-787: Use HiveIO 1.9 (gmalewicz via aching) http://git-wip-us.apache.org/repos/asf/giraph/blob/f2b28e14/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java index 4dee396..f176bfe 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java @@ -34,6 +34,7 @@ import org.apache.giraph.io.VertexOutputFormat; import org.apache.giraph.io.filters.EdgeInputFilter; import org.apache.giraph.io.filters.VertexInputFilter; import org.apache.giraph.job.GiraphJobObserver; +import org.apache.giraph.job.GiraphJobRetryChecker; import org.apache.giraph.master.MasterCompute; import org.apache.giraph.master.MasterObserver; import org.apache.giraph.partition.GraphPartitionerFactory; @@ -299,6 +300,25 @@ public class GiraphConfiguration extends Configuration } /** + * Get job retry checker class + * + * @return GiraphJobRetryChecker class set. + */ + public Class<? extends GiraphJobRetryChecker> getJobRetryCheckerClass() { + return JOB_RETRY_CHECKER_CLASS.get(this); + } + + /** + * Set job retry checker class + * + * @param klass GiraphJobRetryChecker class to set. + */ + public void setJobRetryCheckerClass( + Class<? extends GiraphJobRetryChecker> klass) { + JOB_RETRY_CHECKER_CLASS.set(this, klass); + } + + /** * Check whether to enable jmap dumping thread. * * @return true if jmap dumper is enabled. http://git-wip-us.apache.org/repos/asf/giraph/blob/f2b28e14/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index eb8eb21..6f32e46 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -47,8 +47,10 @@ import org.apache.giraph.io.filters.DefaultEdgeInputFilter; import org.apache.giraph.io.filters.DefaultVertexInputFilter; import org.apache.giraph.io.filters.EdgeInputFilter; import org.apache.giraph.io.filters.VertexInputFilter; +import org.apache.giraph.job.DefaultGiraphJobRetryChecker; import org.apache.giraph.job.DefaultJobObserver; import org.apache.giraph.job.GiraphJobObserver; +import org.apache.giraph.job.GiraphJobRetryChecker; import org.apache.giraph.job.HaltApplicationUtils; import org.apache.giraph.master.DefaultMasterCompute; import org.apache.giraph.master.MasterCompute; @@ -193,6 +195,13 @@ public interface GiraphConstants { DefaultJobObserver.class, GiraphJobObserver.class, "Observer class to watch over job status - optional"); + /** Observer class to watch over job status - optional */ + ClassConfOption<GiraphJobRetryChecker> JOB_RETRY_CHECKER_CLASS = + ClassConfOption.create("giraph.jobRetryCheckerClass", + DefaultGiraphJobRetryChecker.class, GiraphJobRetryChecker.class, + "Class which decides whether a failed job should be retried - " + + "optional"); + // At least one of the input format classes is required. /** VertexInputFormat class */ ClassConfOption<VertexInputFormat> VERTEX_INPUT_FORMAT_CLASS = http://git-wip-us.apache.org/repos/asf/giraph/blob/f2b28e14/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java index 6bb6c00..b33938a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java @@ -51,6 +51,7 @@ import org.apache.giraph.io.superstep_output.NoOpSuperstepOutput; import org.apache.giraph.io.superstep_output.SuperstepOutput; import org.apache.giraph.io.superstep_output.SynchronizedSuperstepOutput; import org.apache.giraph.job.GiraphJobObserver; +import org.apache.giraph.job.GiraphJobRetryChecker; import org.apache.giraph.master.MasterCompute; import org.apache.giraph.master.MasterObserver; import org.apache.giraph.master.SuperstepClasses; @@ -698,6 +699,15 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable, } /** + * Create job retry checker + * + * @return GiraphJobRetryChecker set in configuration. + */ + public GiraphJobRetryChecker getJobRetryChecker() { + return ReflectionUtils.newInstance(getJobRetryCheckerClass(), this); + } + + /** * Get the user's subclassed edge value class. * * @return User's vertex edge value class http://git-wip-us.apache.org/repos/asf/giraph/blob/f2b28e14/giraph-core/src/main/java/org/apache/giraph/job/DefaultGiraphJobRetryChecker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/DefaultGiraphJobRetryChecker.java b/giraph-core/src/main/java/org/apache/giraph/job/DefaultGiraphJobRetryChecker.java new file mode 100644 index 0000000..0cab86c --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/job/DefaultGiraphJobRetryChecker.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.job; + +import org.apache.hadoop.mapreduce.Job; + +/** + * Default implementation of {@link GiraphJobRetryChecker}, + * which never retries the job. + */ +public class DefaultGiraphJobRetryChecker implements GiraphJobRetryChecker { + @Override + public boolean shouldRetry(Job submittedJob, int tryCount) { + // By default, don't retry failed jobs + return false; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/f2b28e14/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java index fca14ac..40670bb 100644 --- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java +++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java @@ -231,27 +231,37 @@ public class GiraphJob { ImmutableClassesGiraphConfiguration conf = new ImmutableClassesGiraphConfiguration(giraphConfiguration); checkLocalJobRunnerConfiguration(conf); - Job submittedJob = new Job(conf, jobName); - if (submittedJob.getJar() == null) { - submittedJob.setJarByClass(getClass()); - } - submittedJob.setNumReduceTasks(0); - submittedJob.setMapperClass(GraphMapper.class); - submittedJob.setInputFormatClass(BspInputFormat.class); - submittedJob.setOutputFormatClass(BspOutputFormat.class); - GiraphJobObserver jobObserver = conf.getJobObserver(); - jobObserver.launchingJob(submittedJob); - submittedJob.submit(); - if (LOG.isInfoEnabled()) { - LOG.info("run: Tracking URL: " + submittedJob.getTrackingURL()); - } - HaltApplicationUtils.printHaltInfo(submittedJob, conf); - jobObserver.jobRunning(submittedJob); + int tryCount = 0; + GiraphJobRetryChecker retryChecker = conf.getJobRetryChecker(); + while (true) { + tryCount++; + Job submittedJob = new Job(conf, jobName); + if (submittedJob.getJar() == null) { + submittedJob.setJarByClass(getClass()); + } + submittedJob.setNumReduceTasks(0); + submittedJob.setMapperClass(GraphMapper.class); + submittedJob.setInputFormatClass(BspInputFormat.class); + submittedJob.setOutputFormatClass(BspOutputFormat.class); - boolean passed = submittedJob.waitForCompletion(verbose); - jobObserver.jobFinished(submittedJob, passed); + GiraphJobObserver jobObserver = conf.getJobObserver(); + jobObserver.launchingJob(submittedJob); + submittedJob.submit(); + if (LOG.isInfoEnabled()) { + LOG.info("run: Tracking URL: " + submittedJob.getTrackingURL()); + } + HaltApplicationUtils.printHaltInfo(submittedJob, conf); + jobObserver.jobRunning(submittedJob); - return passed; + boolean passed = submittedJob.waitForCompletion(verbose); + jobObserver.jobFinished(submittedJob, passed); + if (passed || !retryChecker.shouldRetry(submittedJob, tryCount)) { + return passed; + } + if (LOG.isInfoEnabled()) { + LOG.info("run: Retrying job, " + tryCount + " try"); + } + } } } http://git-wip-us.apache.org/repos/asf/giraph/blob/f2b28e14/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobRetryChecker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobRetryChecker.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobRetryChecker.java new file mode 100644 index 0000000..53a800e --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobRetryChecker.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.job; + +import org.apache.hadoop.mapreduce.Job; + +/** + * Class to decide whether a GiraphJob should be restarted after failure. + */ +public interface GiraphJobRetryChecker { + /** + * Check if the job should be retried + * + * @param submittedJob Job that ran and failed + * @param tryCount How many times have we tried to run the job until now + * + * @return True iff job should be retried + */ + boolean shouldRetry(Job submittedJob, int tryCount); +}
