Updated Branches: refs/heads/trunk 5bb956cad -> 974c30b74
GIRAPH-576: BspServiceMaster.failureCleanup() shouldn't pass null in observers' applicationFailed() method Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/974c30b7 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/974c30b7 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/974c30b7 Branch: refs/heads/trunk Commit: 974c30b7485b653ba37e11d43b474b458134c6ce Parents: 5bb956c Author: Nitay Joffe <[email protected]> Authored: Thu Mar 21 14:21:08 2013 -0400 Committer: Nitay Joffe <[email protected]> Committed: Thu Mar 21 14:21:08 2013 -0400 ---------------------------------------------------------------------- CHANGELOG | 3 + .../org/apache/giraph/master/BspServiceMaster.java | 56 +++++++++++---- 2 files changed, 46 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/974c30b7/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index d1983d0..765dc37 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,9 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-576: BspServiceMaster.failureCleanup() shouldn't pass null in + observers' applicationFailed() method (jgarms via nitay) + GIRAPH-547: Allow in-place modification of edges (apresta) GIRAPH-537: Fix log messages produced by aggregators (majakabiljo) http://git-wip-us.apache.org/repos/asf/giraph/blob/974c30b7/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java index 87497b8..9188a23 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -226,6 +226,22 @@ public class BspServiceMaster<I extends WritableComparable, public void setJobState(ApplicationState state, long applicationAttempt, long desiredSuperstep) { + setJobState(state, applicationAttempt, desiredSuperstep, true); + } + + /** + * Set the job state. + * + * @param state State of the application. + * @param applicationAttempt Attempt to start on + * @param desiredSuperstep Superstep to restart from (if applicable) + * @param killJobOnFailure if true, and the desired state is FAILED, + * then kill this job. + */ + private void setJobState(ApplicationState state, + long applicationAttempt, + long desiredSuperstep, + boolean killJobOnFailure) { JSONObject jobState = new JSONObject(); try { jobState.put(JSONOBJ_STATE_KEY, state.toString()); @@ -258,10 +274,21 @@ public class BspServiceMaster<I extends WritableComparable, "setJobState: Unknown InterruptedException for " + masterJobStatePath, e); } - - if (state == ApplicationState.FAILED) { - failJob(); + if (state == ApplicationState.FAILED && killJobOnFailure) { + failJob(new IllegalStateException("FAILED")); } + + } + + /** + * Set the job state to FAILED. This will kill the job, and log exceptions to + * any observers. + * + * @param reason The reason the job failed + */ + private void setJobStateFailed(String reason) { + setJobState(ApplicationState.FAILED, -1, -1, false); + failJob(new IllegalStateException(reason)); } /** @@ -308,9 +335,12 @@ public class BspServiceMaster<I extends WritableComparable, /** * When there is no salvaging this job, fail it. + * + * @param e Exception to log to observers */ - private void failJob() { + private void failJob(Exception e) { LOG.fatal("failJob: Killing job " + getJobId()); + LOG.fatal("failJob: exception " + e.toString()); try { @SuppressWarnings("deprecation") org.apache.hadoop.mapred.JobClient jobClient = @@ -320,10 +350,11 @@ public class BspServiceMaster<I extends WritableComparable, @SuppressWarnings("deprecation") JobID jobId = JobID.forName(getJobId()); RunningJob job = jobClient.getJob(jobId); - failureCleanup(null); job.killJob(); - } catch (IOException e) { - throw new RuntimeException(e); + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } finally { + failureCleanup(e); } } @@ -572,7 +603,7 @@ public class BspServiceMaster<I extends WritableComparable, // where it left off. List<WorkerInfo> healthyWorkerInfoList = checkWorkers(); if (healthyWorkerInfoList == null) { - setJobState(ApplicationState.FAILED, -1, -1); + setJobStateFailed("Not enough healthy workers to create input splits"); return -1; } @@ -590,7 +621,7 @@ public class BspServiceMaster<I extends WritableComparable, "check input of " + inputFormat.getClass().getName() + "!"); getContext().setStatus("Failing job due to 0 input splits, " + "check input of " + inputFormat.getClass().getName() + "!"); - failJob(); + setJobStateFailed("0 input splits"); } if (minSplitCountHint > splitList.size()) { LOG.warn(logPrefix + ": Number of inputSplits=" + @@ -1189,7 +1220,7 @@ public class BspServiceMaster<I extends WritableComparable, } catch (IOException e) { LOG.fatal("getLastGoodCheckpoint: No last good checkpoints can be " + "found, killing the job.", e); - failJob(); + failJob(e); } } @@ -1401,9 +1432,8 @@ public class BspServiceMaster<I extends WritableComparable, chosenWorkerInfoList = checkWorkers(); if (chosenWorkerInfoList == null) { - LOG.fatal("coordinateSuperstep: Not enough healthy workers for " + - "superstep " + getSuperstep()); - setJobState(ApplicationState.FAILED, -1, -1); + setJobStateFailed("coordinateSuperstep: Not enough healthy workers for " + + "superstep " + getSuperstep()); } else { for (WorkerInfo workerInfo : chosenWorkerInfoList) { String workerInfoHealthyPath =
