Updated Branches: refs/heads/trunk 57ea55610 -> 533d52159
GIRAPH-464: MasterObserver#applicationFailed callback (nitay) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/533d5215 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/533d5215 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/533d5215 Branch: refs/heads/trunk Commit: 533d52159590f87e8e9996a853e7582e67ae8cd6 Parents: 57ea556 Author: Nitay Joffe <[email protected]> Authored: Thu Jan 3 12:29:46 2013 -0500 Committer: Nitay Joffe <[email protected]> Committed: Thu Jan 3 18:09:58 2013 -0500 ---------------------------------------------------------------------- CHANGELOG | 2 + .../giraph/bsp/CentralizedServiceMaster.java | 7 +++++ .../org/apache/giraph/graph/BspServiceMaster.java | 22 ++++++++++++--- .../java/org/apache/giraph/graph/GraphMapper.java | 3 +- .../java/org/apache/giraph/graph/MasterThread.java | 21 +++++--------- .../giraph/master/DefaultMasterObserver.java | 15 +++++----- .../org/apache/giraph/master/MasterObserver.java | 7 +++++ 7 files changed, 49 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/533d5215/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 0649b77..e36ffcd 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-464: MasterObserver#applicationFailed callback (nitay) + GIRAPH-458: split formats module into accumulo,hbase,hcatalog (nitay) GIRAPH-463: Create VertexResolver only once (apresta) http://git-wip-us.apache.org/repos/asf/giraph/blob/533d5215/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java index a328737..c351313 100644 --- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java @@ -132,4 +132,11 @@ public interface CentralizedServiceMaster<I extends WritableComparable, * Application has finished. */ void postApplication(); + + /** + * Called when the job fails in order to let the Master do any cleanup. + * + * @param e Exception job failed from. May be null. + */ + void failureCleanup(Exception e); } http://git-wip-us.apache.org/repos/asf/giraph/blob/533d5215/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceMaster.java index ee64a46..41bbcee 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceMaster.java @@ -57,6 +57,7 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Mapper; @@ -293,8 +294,6 @@ public class BspServiceMaster<I extends WritableComparable, /** * When there is no salvaging this job, fail it. - * - * @throws IOException */ private void failJob() { LOG.fatal("failJob: Killing job " + getJobId()); @@ -305,9 +304,9 @@ public class BspServiceMaster<I extends WritableComparable, (org.apache.hadoop.mapred.JobConf) getContext().getConfiguration()); @SuppressWarnings("deprecation") - org.apache.hadoop.mapred.JobID jobId = - org.apache.hadoop.mapred.JobID.forName(getJobId()); + JobID jobId = JobID.forName(getJobId()); RunningJob job = jobClient.getJob(jobId); + failureCleanup(null); job.killJob(); } catch (IOException e) { throw new RuntimeException(e); @@ -1631,6 +1630,21 @@ public class BspServiceMaster<I extends WritableComparable, } @Override + public void failureCleanup(Exception e) { + for (MasterObserver observer : observers) { + try { + observer.applicationFailed(e); + // CHECKSTYLE: stop IllegalCatchCheck + } catch (RuntimeException re) { + // CHECKSTYLE: resume IllegalCatchCheck + LOG.error(re.getClass().getName() + " from observer " + + observer.getClass().getName(), re); + } + getContext().progress(); + } + } + + @Override public void cleanup() throws IOException { // All master processes should denote they are done by adding special // znode. Once the number of znodes equals the number of partitions http://git-wip-us.apache.org/repos/asf/giraph/blob/533d5215/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java index eb35723..e491840 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java @@ -416,8 +416,7 @@ public class GraphMapper<I extends WritableComparable, V extends Writable, } serviceMaster = new BspServiceMaster<I, V, E, M>( serverPortList, sessionMsecTimeout, context, this); - masterThread = new MasterThread<I, V, E, M>( - (BspServiceMaster<I, V, E, M>) serviceMaster, context); + masterThread = new MasterThread<I, V, E, M>(serviceMaster, context); masterThread.start(); } if ((mapFunctions == MapFunctions.WORKER_ONLY) || http://git-wip-us.apache.org/repos/asf/giraph/blob/533d5215/giraph-core/src/main/java/org/apache/giraph/graph/MasterThread.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/MasterThread.java b/giraph-core/src/main/java/org/apache/giraph/graph/MasterThread.java index dbded04..e27de42 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/MasterThread.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/MasterThread.java @@ -28,9 +28,7 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.log4j.Logger; -import org.apache.zookeeper.KeeperException; -import java.io.IOException; import java.util.Map; import java.util.Map.Entry; import java.util.TreeMap; @@ -71,7 +69,7 @@ public class MasterThread<I extends WritableComparable, V extends Writable, * been called. * @param context Context from the Mapper. */ - MasterThread(BspServiceMaster<I, V, E, M> bspServiceMaster, + MasterThread(CentralizedServiceMaster<I, V, E, M> bspServiceMaster, Context context) { super(MasterThread.class.getName()); this.bspServiceMaster = bspServiceMaster; @@ -173,19 +171,14 @@ public class MasterThread<I extends WritableComparable, V extends Writable, GiraphTimers.getInstance().getTotalMs(). increment(System.currentTimeMillis() - startMillis); } - } catch (IOException e) { + bspServiceMaster.postApplication(); + // CHECKSTYLE: stop IllegalCatchCheck + } catch (Exception e) { + // CHECKSTYLE: resume IllegalCatchCheck + bspServiceMaster.failureCleanup(e); LOG.error("masterThread: Master algorithm failed with " + - "IOException ", e); - throw new IllegalStateException(e); - } catch (InterruptedException e) { - LOG.error("masterThread: Master algorithm failed with " + - "InterruptedException", e); - throw new IllegalStateException(e); - } catch (KeeperException e) { - LOG.error("masterThread: Master algorithm failed with " + - "KeeperException", e); + e.getClass().getSimpleName(), e); throw new IllegalStateException(e); } - bspServiceMaster.postApplication(); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/533d5215/giraph-core/src/main/java/org/apache/giraph/master/DefaultMasterObserver.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/DefaultMasterObserver.java b/giraph-core/src/main/java/org/apache/giraph/master/DefaultMasterObserver.java index 4b4dee6..f566979 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/DefaultMasterObserver.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/DefaultMasterObserver.java @@ -38,18 +38,17 @@ public class DefaultMasterObserver implements MasterObserver { } @Override - public void preApplication() { - } + public void preApplication() { } @Override - public void postApplication() { - } + public void postApplication() { } @Override - public void preSuperstep() { - } + public void applicationFailed(Exception e) { } @Override - public void postSuperstep() { - } + public void preSuperstep() { } + + @Override + public void postSuperstep() { } } http://git-wip-us.apache.org/repos/asf/giraph/blob/533d5215/giraph-core/src/main/java/org/apache/giraph/master/MasterObserver.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterObserver.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterObserver.java index b8f5a26..a72b18a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/MasterObserver.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterObserver.java @@ -35,6 +35,13 @@ public interface MasterObserver extends ImmutableClassesGiraphConfigurable { void postApplication(); /** + * If there is an error during the application. + * + * @param e Exception that caused failure. May be null. + */ + void applicationFailed(Exception e); + + /** * Before each superstep starts. */ void preSuperstep();
