GIRAPH-933: Checkpointing improvements (edunov via majakabiljo)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/5adca63d Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/5adca63d Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/5adca63d Branch: refs/heads/release-1.1 Commit: 5adca63deca25d84f4fdea053c35a85efc8bbb3d Parents: bc9f823 Author: Maja Kabiljo <[email protected]> Authored: Fri Aug 15 15:03:19 2014 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Fri Aug 15 15:03:19 2014 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 + .../java/org/apache/giraph/bsp/BspService.java | 108 ++++++---- .../apache/giraph/bsp/CentralizedService.java | 9 - .../giraph/bsp/CentralizedServiceMaster.java | 4 +- .../giraph/bsp/CentralizedServiceWorker.java | 8 + .../org/apache/giraph/bsp/CheckpointStatus.java | 31 +++ .../org/apache/giraph/bsp/SuperstepState.java | 30 ++- .../java/org/apache/giraph/comm/ServerData.java | 10 + .../org/apache/giraph/conf/GiraphConstants.java | 10 +- .../giraph/graph/FinishedSuperstepStats.java | 20 +- .../org/apache/giraph/graph/GlobalStats.java | 27 ++- .../apache/giraph/graph/GraphTaskManager.java | 44 ++-- .../job/DefaultGiraphJobRetryChecker.java | 5 + .../java/org/apache/giraph/job/GiraphJob.java | 23 ++ .../giraph/job/GiraphJobRetryChecker.java | 6 + .../java/org/apache/giraph/job/HadoopUtils.java | 15 ++ .../apache/giraph/master/BspServiceMaster.java | 147 +++++++++---- .../org/apache/giraph/master/MasterThread.java | 10 +- .../apache/giraph/utils/CheckpointingUtils.java | 62 ++++++ .../org/apache/giraph/utils/WritableUtils.java | 63 ++++++ .../apache/giraph/worker/BspServiceWorker.java | 47 ++++- .../apache/giraph/utils/TestWritableUtils.java | 70 +++++++ .../org/apache/giraph/TestCheckpointing.java | 208 +++++++++++++++---- pom.xml | 2 +- 24 files changed, 787 insertions(+), 174 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 66136b2..b64ce2c 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-933: Checkpointing improvements (edunov via majakabiljo) + GIRAPH-943: Perf regression due to netty 4.0.21 (pavanka) GIRAPH-935: Loosen modifiers when needed (ikabiljo via majakabiljo) http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java index 02577b9..c418a89 100644 --- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java @@ -24,13 +24,16 @@ import org.apache.giraph.graph.GraphTaskManager; import org.apache.giraph.graph.InputSplitEvents; import org.apache.giraph.graph.InputSplitPaths; import org.apache.giraph.partition.GraphPartitionerFactory; +import org.apache.giraph.utils.CheckpointingUtils; import org.apache.giraph.worker.WorkerInfo; import org.apache.giraph.zk.BspEvent; import org.apache.giraph.zk.PredicateLock; import org.apache.giraph.zk.ZooKeeperExt; import org.apache.giraph.zk.ZooKeeperManager; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; @@ -50,10 +53,10 @@ import java.net.UnknownHostException; import java.nio.charset.Charset; import java.security.InvalidParameterException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; -import static org.apache.giraph.conf.GiraphConstants.CHECKPOINT_DIRECTORY; import static org.apache.giraph.conf.GiraphConstants.RESTART_JOB_ID; /** @@ -162,6 +165,8 @@ public abstract class BspService<I extends WritableComparable, public static final String WORKER_PROGRESSES = "/_workerProgresses"; /** Denotes that computation should be halted */ public static final String HALT_COMPUTATION_NODE = "/_haltComputation"; + /** User sets this flag to checkpoint and stop the job */ + public static final String FORCE_CHECKPOINT_USER_FLAG = "/_checkpointAndStop"; /** Denotes which workers have been cleaned up */ public static final String CLEANED_UP_DIR = "/_cleanedUpDir"; /** JSON partition stats key */ @@ -283,8 +288,6 @@ public abstract class BspService<I extends WritableComparable, private final GraphTaskManager<I, V, E> graphTaskManager; /** File system */ private final FileSystem fs; - /** Checkpoint frequency */ - private final int checkpointFrequency; /** * Constructor. @@ -325,13 +328,6 @@ public abstract class BspService<I extends WritableComparable, this.taskPartition = conf.getTaskPartition(); this.restartedSuperstep = conf.getLong( GiraphConstants.RESTART_SUPERSTEP, UNSET_SUPERSTEP); - this.cachedSuperstep = restartedSuperstep; - if ((restartedSuperstep != UNSET_SUPERSTEP) && - (restartedSuperstep < 0)) { - throw new IllegalArgumentException( - "BspService: Invalid superstep to restart - " + - restartedSuperstep); - } try { this.hostname = conf.getLocalHostname(); } catch (UnknownHostException e) { @@ -340,8 +336,6 @@ public abstract class BspService<I extends WritableComparable, this.hostnamePartitionId = hostname + "_" + getTaskPartition(); this.graphPartitionerFactory = conf.createGraphPartitioner(); - this.checkpointFrequency = conf.getCheckpointFrequency(); - basePath = ZooKeeperManager.getBasePath(conf) + BASE_DIR + "/" + jobId; getContext().getCounter(GiraphConstants.ZOOKEEPER_BASE_PATH_COUNTER_GROUP, basePath); @@ -360,13 +354,14 @@ public abstract class BspService<I extends WritableComparable, cleanedUpPath = basePath + CLEANED_UP_DIR; String restartJobId = RESTART_JOB_ID.get(conf); + savedCheckpointBasePath = - CHECKPOINT_DIRECTORY.getWithDefault(getConfiguration(), - CHECKPOINT_DIRECTORY.getDefaultValue() + "/" + - (restartJobId == null ? getJobId() : restartJobId)); - checkpointBasePath = - CHECKPOINT_DIRECTORY.getWithDefault(getConfiguration(), - CHECKPOINT_DIRECTORY.getDefaultValue() + "/" + getJobId()); + CheckpointingUtils.getCheckpointBasePath(getConfiguration(), + restartJobId == null ? getJobId() : restartJobId); + + checkpointBasePath = CheckpointingUtils. + getCheckpointBasePath(getConfiguration(), getJobId()); + masterElectionPath = basePath + MASTER_ELECTION_DIR; myProgressPath = basePath + WORKER_PROGRESSES + "/" + taskPartition; String serverPortList = conf.getZookeeperList(); @@ -392,6 +387,24 @@ public abstract class BspService<I extends WritableComparable, } catch (IOException e) { throw new RuntimeException(e); } + + //Trying to restart from the latest superstep + if (restartJobId != null && + restartedSuperstep == UNSET_SUPERSTEP) { + try { + restartedSuperstep = getLastCheckpointedSuperstep(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + this.cachedSuperstep = restartedSuperstep; + if ((restartedSuperstep != UNSET_SUPERSTEP) && + (restartedSuperstep < 0)) { + throw new IllegalArgumentException( + "BspService: Invalid superstep to restart - " + + restartedSuperstep); + } + } /** @@ -643,28 +656,6 @@ public abstract class BspService<I extends WritableComparable, } /** - * Should checkpoint on this superstep? If checkpointing, always - * checkpoint the first user superstep. If restarting, the first - * checkpoint is after the frequency has been met. - * - * @param superstep Decide if checkpointing no this superstep - * @return True if this superstep should be checkpointed, false otherwise - */ - public final boolean checkpointFrequencyMet(long superstep) { - if (checkpointFrequency == 0) { - return false; - } - long firstCheckpoint = INPUT_SUPERSTEP + 1; - if (getRestartedSuperstep() != UNSET_SUPERSTEP) { - firstCheckpoint = getRestartedSuperstep() + checkpointFrequency; - } - if (superstep < firstCheckpoint) { - return false; - } - return ((superstep - firstCheckpoint) % checkpointFrequency) == 0; - } - - /** * Get the file system * * @return file system @@ -1241,4 +1232,41 @@ public abstract class BspService<I extends WritableComparable, } return eventProcessed; } + + /** + * Get the last saved superstep. + * + * @return Last good superstep number + * @throws IOException + */ + protected long getLastCheckpointedSuperstep() throws IOException { + FileStatus[] fileStatusArray = + getFs().listStatus(new Path(savedCheckpointBasePath), + new FinalizedCheckpointPathFilter()); + if (fileStatusArray == null) { + return -1; + } + Arrays.sort(fileStatusArray); + long lastCheckpointedSuperstep = getCheckpoint( + fileStatusArray[fileStatusArray.length - 1].getPath()); + if (LOG.isInfoEnabled()) { + LOG.info("getLastGoodCheckpoint: Found last good checkpoint " + + lastCheckpointedSuperstep + " from " + + fileStatusArray[fileStatusArray.length - 1]. + getPath().toString()); + } + return lastCheckpointedSuperstep; + } + + /** + * Only get the finalized checkpoint files + */ + private static class FinalizedCheckpointPathFilter implements PathFilter { + @Override + public boolean accept(Path path) { + return path.getName().endsWith(BspService.CHECKPOINT_FINALIZED_POSTFIX); + } + + } + } http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java index ff3e427..560f1fb 100644 --- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java @@ -51,15 +51,6 @@ public interface CentralizedService<I extends WritableComparable, long getRestartedSuperstep(); /** - * Given a superstep, should it be checkpointed based on the - * checkpoint frequency? - * - * @param superstep superstep to check against frequency - * @return true if checkpoint frequency met or superstep is 1. - */ - boolean checkpointFrequencyMet(long superstep); - - /** * Get list of workers * * @return List of workers http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java index e5b7cf3..9b4f9d6 100644 --- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java @@ -175,7 +175,9 @@ public interface CentralizedServiceMaster<I extends WritableComparable, * * @throws IOException * @throws InterruptedException + * @param superstepState what was the state + * of the last complete superstep? */ - void cleanup() + void cleanup(SuperstepState superstepState) throws IOException, InterruptedException; } http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java index e5d0ae1..37aed45 100644 --- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java @@ -21,6 +21,7 @@ package org.apache.giraph.bsp; import org.apache.giraph.comm.ServerData; import org.apache.giraph.comm.WorkerClient; import org.apache.giraph.graph.FinishedSuperstepStats; +import org.apache.giraph.graph.GlobalStats; import org.apache.giraph.graph.GraphTaskManager; import org.apache.giraph.graph.VertexEdgeCount; import org.apache.giraph.io.superstep_output.SuperstepOutput; @@ -237,4 +238,11 @@ public interface CentralizedServiceWorker<I extends WritableComparable, */ void cleanup(FinishedSuperstepStats finishedSuperstepStats) throws IOException, InterruptedException; + + /** + * Loads Global stats from zookeeper. + * @return global stats stored in zookeeper for + * previous superstep. + */ + GlobalStats getGlobalStats(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/bsp/CheckpointStatus.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CheckpointStatus.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CheckpointStatus.java new file mode 100644 index 0000000..74db490 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CheckpointStatus.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.bsp; + +/** + * Enum represents possible checkpoint state. + */ +public enum CheckpointStatus { + /** Do nothing, no checkpoint required */ + NONE, + /** Regular checkpoint */ + CHECKPOINT, + /** Do checkpoint and then halt further computation */ + CHECKPOINT_AND_HALT +} http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/bsp/SuperstepState.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/SuperstepState.java b/giraph-core/src/main/java/org/apache/giraph/bsp/SuperstepState.java index c384fbf..768278b 100644 --- a/giraph-core/src/main/java/org/apache/giraph/bsp/SuperstepState.java +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/SuperstepState.java @@ -23,11 +23,33 @@ package org.apache.giraph.bsp; */ public enum SuperstepState { /** Nothing happened yet */ - INITIAL, + INITIAL(false), /** A worker died during this superstep */ - WORKER_FAILURE, + WORKER_FAILURE(false), /** This superstep completed correctly */ - THIS_SUPERSTEP_DONE, + THIS_SUPERSTEP_DONE(false), /** All supersteps are complete */ - ALL_SUPERSTEPS_DONE, + ALL_SUPERSTEPS_DONE(true), + /** Execution halted */ + CHECKPOINT_AND_HALT(true); + + /** Should we stop execution after this superstep? */ + private boolean executionComplete; + + /** + * Enum constructor + * @param executionComplete is final state? + */ + SuperstepState(boolean executionComplete) { + this.executionComplete = executionComplete; + } + + /** + * Returns true if execution has to be stopped after this + * superstep. + * @return whether execution is complete. + */ + public boolean isExecutionComplete() { + return executionComplete; + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java index a92cd1c..1fd85e4 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java @@ -269,4 +269,14 @@ public class ServerData<I extends WritableComparable, public void addIncomingWorkerToWorkerMessage(Writable message) { incomingWorkerToWorkerMessages.add(message); } + + + /** + * Get worker to worker messages received in previous superstep. + * @return list of current worker to worker messages. + */ + public List<Writable> getCurrentWorkerToWorkerMessages() { + return currentWorkerToWorkerMessages; + } + } http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index 0424a47..da0a8db 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -1137,12 +1137,16 @@ public interface GiraphConstants { new IntConfOption("giraph.checkpoint.io.threads", 8, "Number of threads for writing and reading checkpoints"); - /** Compression algorithm to be used for checkpointing */ + /** + * Compression algorithm to be used for checkpointing. + * Defined by extension for hadoop compatibility reasons. + */ StrConfOption CHECKPOINT_COMPRESSION_CODEC = new StrConfOption("giraph.checkpoint.compression.codec", - "org.apache.hadoop.io.compress.DefaultCodec", + ".deflate", "Defines compression algorithm we will be using for " + - "storing checkpoint"); + "storing checkpoint. Available options include but " + + "not restricted to: .deflate, .gz, .bz2, .lzo"); /** Number of threads to use in async message store, 0 means * we should not use async message processing */ http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java b/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java index c351778..f7895a9 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java @@ -17,6 +17,8 @@ */ package org.apache.giraph.graph; +import org.apache.giraph.bsp.CheckpointStatus; + /** * Immutable graph stats after the completion of a superstep */ @@ -27,6 +29,11 @@ public class FinishedSuperstepStats extends VertexEdgeCount { private final boolean allVerticesHalted; /** Needs to load a checkpoint */ private final boolean mustLoadCheckpoint; + /** + * Master decides when we need to checkpoint and what should + * we do next. + */ + private final CheckpointStatus checkpointStatus; /** * Constructor. @@ -36,16 +43,19 @@ public class FinishedSuperstepStats extends VertexEdgeCount { * @param numVertices Number of vertices * @param numEdges Number of edges * @param mustLoadCheckpoint Has to load a checkpoint? + * @param checkpointStatus Should we checkpoint after this superstep? */ public FinishedSuperstepStats(long numLocalVertices, boolean allVerticesHalted, long numVertices, long numEdges, - boolean mustLoadCheckpoint) { + boolean mustLoadCheckpoint, + CheckpointStatus checkpointStatus) { super(numVertices, numEdges); this.localVertexCount = numLocalVertices; this.allVerticesHalted = allVerticesHalted; this.mustLoadCheckpoint = mustLoadCheckpoint; + this.checkpointStatus = checkpointStatus; } public long getLocalVertexCount() { @@ -69,4 +79,12 @@ public class FinishedSuperstepStats extends VertexEdgeCount { public boolean mustLoadCheckpoint() { return mustLoadCheckpoint; } + + /** + * What master thinks about checkpointing after this superstep. + * @return CheckpointStatus that reflects master decision. + */ + public CheckpointStatus getCheckpointStatus() { + return checkpointStatus; + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java b/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java index bc56c9c..e11f02c 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GlobalStats.java @@ -22,6 +22,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import org.apache.giraph.bsp.CheckpointStatus; import org.apache.giraph.partition.PartitionStats; import org.apache.hadoop.io.Writable; @@ -41,6 +42,12 @@ public class GlobalStats implements Writable { private long messageBytesCount = 0; /** Whether the computation should be halted */ private boolean haltComputation = false; + /** + * Master's decision on whether we should checkpoint and + * what to do next. + */ + private CheckpointStatus checkpointStatus = + CheckpointStatus.NONE; /** * Add the stats of a partition to the global stats. @@ -81,6 +88,14 @@ public class GlobalStats implements Writable { haltComputation = value; } + public CheckpointStatus getCheckpointStatus() { + return checkpointStatus; + } + + public void setCheckpointStatus(CheckpointStatus checkpointStatus) { + this.checkpointStatus = checkpointStatus; + } + /** * Add messages to the global stats. * @@ -107,6 +122,11 @@ public class GlobalStats implements Writable { messageCount = input.readLong(); messageBytesCount = input.readLong(); haltComputation = input.readBoolean(); + if (input.readBoolean()) { + checkpointStatus = CheckpointStatus.values()[input.readInt()]; + } else { + checkpointStatus = null; + } } @Override @@ -117,6 +137,10 @@ public class GlobalStats implements Writable { output.writeLong(messageCount); output.writeLong(messageBytesCount); output.writeBoolean(haltComputation); + output.writeBoolean(checkpointStatus != null); + if (checkpointStatus != null) { + output.writeInt(checkpointStatus.ordinal()); + } } @Override @@ -124,6 +148,7 @@ public class GlobalStats implements Writable { return "(vtx=" + vertexCount + ",finVtx=" + finishedVertexCount + ",edges=" + edgeCount + ",msgCount=" + messageCount + ",msgBytesCount=" + - messageBytesCount + ",haltComputation=" + haltComputation + ")"; + messageBytesCount + ",haltComputation=" + haltComputation + + ", checkpointStatus=" + checkpointStatus + ')'; } } http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java index 6ebb002..8a97939 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java @@ -21,6 +21,7 @@ package org.apache.giraph.graph; import org.apache.giraph.bsp.BspService; import org.apache.giraph.bsp.CentralizedServiceMaster; import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.bsp.CheckpointStatus; import org.apache.giraph.comm.messages.MessageStore; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; @@ -120,7 +121,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, private GraphFunctions graphFunctions = GraphFunctions.UNKNOWN; /** Superstep stats */ private FinishedSuperstepStats finishedSuperstepStats = - new FinishedSuperstepStats(0, false, 0, 0, false); + new FinishedSuperstepStats(0, false, 0, 0, false, CheckpointStatus.NONE); // Per-Job Metrics /** Timer for WorkerContext#preApplication() */ @@ -281,7 +282,18 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, context.progress(); serviceWorker.exchangeVertexPartitions(masterAssignedPartitionOwners); context.progress(); - graphState = checkSuperstepRestarted(superstep, graphState); + boolean hasBeenRestarted = checkSuperstepRestarted(superstep); + + GlobalStats globalStats = serviceWorker.getGlobalStats(); + + if (hasBeenRestarted) { + graphState = new GraphState(superstep, + finishedSuperstepStats.getVertexCount(), + finishedSuperstepStats.getEdgeCount(), + context); + } else if (storeCheckpoint(globalStats.getCheckpointStatus())) { + break; + } prepareForSuperstep(graphState); context.progress(); MessageStore<I, Writable> messageStore = @@ -735,11 +747,9 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, /** * Handle the event that this superstep is a restart of a failed one. * @param superstep current superstep - * @param graphState the BSP graph state * @return the graph state, updated if this is a restart superstep */ - private GraphState checkSuperstepRestarted(long superstep, - GraphState graphState) throws IOException { + private boolean checkSuperstepRestarted(long superstep) throws IOException { // Might need to restart from another superstep // (manually or automatic), or store a checkpoint if (serviceWorker.getRestartedSuperstep() == superstep) { @@ -750,15 +760,25 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, serviceWorker.getRestartedSuperstep()); finishedSuperstepStats = new FinishedSuperstepStats(0, false, vertexEdgeCount.getVertexCount(), vertexEdgeCount.getEdgeCount(), - false); - graphState = new GraphState(superstep, - finishedSuperstepStats.getVertexCount(), - finishedSuperstepStats.getEdgeCount(), - context); - } else if (serviceWorker.checkpointFrequencyMet(superstep)) { + false, CheckpointStatus.NONE); + return true; + } + return false; + } + + /** + * Check if it's time to checkpoint and actually does checkpointing + * if it is. + * @param checkpointStatus master's decision + * @return true if we need to stop computation after checkpoint + * @throws IOException + */ + private boolean storeCheckpoint(CheckpointStatus checkpointStatus) + throws IOException { + if (checkpointStatus != CheckpointStatus.NONE) { serviceWorker.storeCheckpoint(); } - return graphState; + return checkpointStatus == CheckpointStatus.CHECKPOINT_AND_HALT; } /** http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/job/DefaultGiraphJobRetryChecker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/DefaultGiraphJobRetryChecker.java b/giraph-core/src/main/java/org/apache/giraph/job/DefaultGiraphJobRetryChecker.java index 0cab86c..edf6bce 100644 --- a/giraph-core/src/main/java/org/apache/giraph/job/DefaultGiraphJobRetryChecker.java +++ b/giraph-core/src/main/java/org/apache/giraph/job/DefaultGiraphJobRetryChecker.java @@ -30,4 +30,9 @@ public class DefaultGiraphJobRetryChecker implements GiraphJobRetryChecker { // By default, don't retry failed jobs return false; } + + @Override + public boolean shouldRestartCheckpoint() { + return false; + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java index 4a1f02e..436126b 100644 --- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java +++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java @@ -24,9 +24,13 @@ import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.GraphMapper; +import org.apache.giraph.utils.CheckpointingUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.ipc.Client; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobID; import org.apache.log4j.Logger; import java.io.IOException; @@ -261,6 +265,25 @@ public class GiraphJob { jobProgressTracker.stop(); } jobObserver.jobFinished(submittedJob, passed); + + FileSystem fs = FileSystem.get(conf); + JobID jobID = HadoopUtils.getJobID(submittedJob); + if (jobID != null) { + Path checkpointMark = + CheckpointingUtils.getCheckpointMarkPath(conf, jobID.toString()); + + if (fs.exists(checkpointMark)) { + if (retryChecker.shouldRestartCheckpoint()) { + GiraphConstants.RESTART_JOB_ID.set(conf, jobID.toString()); + continue; + } + } + } else { + LOG.warn("jobID is null, are you using hadoop 0.20.203? " + + "Please report this issue here " + + "https://issues.apache.org/jira/browse/GIRAPH-933"); + } + if (passed || !retryChecker.shouldRetry(submittedJob, tryCount)) { return passed; } http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobRetryChecker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobRetryChecker.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobRetryChecker.java index 53a800e..556b128 100644 --- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobRetryChecker.java +++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJobRetryChecker.java @@ -33,4 +33,10 @@ public interface GiraphJobRetryChecker { * @return True iff job should be retried */ boolean shouldRetry(Job submittedJob, int tryCount); + + /** + * The job has been checkpointed and halted. Should we now restart it? + * @return true if checkpointed job should be automatically restarted. + */ + boolean shouldRestartCheckpoint(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/job/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/HadoopUtils.java b/giraph-core/src/main/java/org/apache/giraph/job/HadoopUtils.java index 9530fd6..f2c673b 100644 --- a/giraph-core/src/main/java/org/apache/giraph/job/HadoopUtils.java +++ b/giraph-core/src/main/java/org/apache/giraph/job/HadoopUtils.java @@ -18,6 +18,7 @@ package org.apache.giraph.job; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.TaskAttemptID; @@ -102,6 +103,20 @@ public class HadoopUtils { } /** + * Get Job ID from job. + * May return null for hadoop 0.20.203 + * @param job submitted job + * @return JobId for submitted job. + */ + public static JobID getJobID(Job job) { + /*if[HADOOP_JOB_ID_AVAILABLE] + return job.getID(); + else[HADOOP_JOB_ID_AVAILABLE]*/ + return job.getJobID(); + /*end[HADOOP_JOB_ID_AVAILABLE]*/ + } + + /** * Create a JobContext, supporting many Hadoops. * * @param conf Configuration http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java index e129390..671df23 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -24,6 +24,7 @@ import org.apache.commons.io.FilenameUtils; import org.apache.giraph.bsp.ApplicationState; import org.apache.giraph.bsp.BspInputFormat; import org.apache.giraph.bsp.CentralizedServiceMaster; +import org.apache.giraph.bsp.CheckpointStatus; import org.apache.giraph.bsp.SuperstepState; import org.apache.giraph.comm.MasterClient; import org.apache.giraph.comm.MasterServer; @@ -56,6 +57,7 @@ import org.apache.giraph.metrics.GiraphTimerContext; import org.apache.giraph.metrics.ResetSuperstepMetricsObserver; import org.apache.giraph.metrics.SuperstepMetricsRegistry; import org.apache.giraph.metrics.WorkerSuperstepMetrics; +import org.apache.giraph.utils.CheckpointingUtils; import org.apache.giraph.utils.JMapHistoDumper; import org.apache.giraph.utils.ReactiveJMapHistoDumper; import org.apache.giraph.utils.ProgressableUtils; @@ -67,10 +69,8 @@ import org.apache.giraph.worker.WorkerInfo; import org.apache.giraph.zk.BspEvent; import org.apache.giraph.zk.PredicateLock; import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -99,7 +99,6 @@ import java.io.IOException; import java.io.PrintStream; import java.nio.charset.Charset; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -189,6 +188,11 @@ public class BspServiceMaster<I extends WritableComparable, /** MasterCompute time */ private GiraphTimer masterComputeTimer; + /** Checkpoint frequency */ + private final int checkpointFrequency; + /** Current checkpoint status */ + private CheckpointStatus checkpointStatus; + /** * Constructor for setting up the master. * @@ -224,6 +228,9 @@ public class BspServiceMaster<I extends WritableComparable, } observers = conf.createMasterObservers(); + this.checkpointFrequency = conf.getCheckpointFrequency(); + this.checkpointStatus = CheckpointStatus.NONE; + GiraphMetrics.get().addSuperstepResetObserver(this); GiraphStats.init((Mapper.Context) context); } @@ -365,7 +372,11 @@ public class BspServiceMaster<I extends WritableComparable, @SuppressWarnings("deprecation") JobID jobId = JobID.forName(getJobId()); RunningJob job = jobClient.getJob(jobId); - job.killJob(); + if (job != null) { + job.killJob(); + } else { + LOG.error("Jon not found for jobId=" + getJobId()); + } } } catch (IOException ioe) { throw new RuntimeException(ioe); @@ -1196,11 +1207,11 @@ public class BspServiceMaster<I extends WritableComparable, * * @param chosenWorkerInfoHealthPath Path to the healthy workers in ZooKeeper * @param chosenWorkerInfoList List of the healthy workers - * @return true if they are all alive, false otherwise. + * @return a list of dead workers. Empty list if all workers are alive. * @throws InterruptedException * @throws KeeperException */ - private boolean superstepChosenWorkerAlive( + private Collection<WorkerInfo> superstepChosenWorkerAlive( String chosenWorkerInfoHealthPath, List<WorkerInfo> chosenWorkerInfoList) throws KeeperException, InterruptedException { @@ -1208,16 +1219,13 @@ public class BspServiceMaster<I extends WritableComparable, getWorkerInfosFromPath(chosenWorkerInfoHealthPath, false); Set<WorkerInfo> chosenWorkerInfoHealthySet = new HashSet<WorkerInfo>(chosenWorkerInfoHealthyList); - boolean allChosenWorkersHealthy = true; + List<WorkerInfo> deadWorkers = new ArrayList<>(); for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) { if (!chosenWorkerInfoHealthySet.contains(chosenWorkerInfo)) { - allChosenWorkersHealthy = false; - LOG.error("superstepChosenWorkerAlive: Missing chosen " + - "worker " + chosenWorkerInfo + - " on superstep " + getSuperstep()); + deadWorkers.add(chosenWorkerInfo); } } - return allChosenWorkersHealthy; + return deadWorkers; } @Override @@ -1257,37 +1265,13 @@ public class BspServiceMaster<I extends WritableComparable, } } - /** - * Only get the finalized checkpoint files - */ - public static class FinalizedCheckpointPathFilter implements PathFilter { - @Override - public boolean accept(Path path) { - return path.getName().endsWith(BspService.CHECKPOINT_FINALIZED_POSTFIX); - } - } - @Override public long getLastGoodCheckpoint() throws IOException { // Find the last good checkpoint if none have been written to the // knowledge of this master if (lastCheckpointedSuperstep == -1) { try { - FileStatus[] fileStatusArray = - getFs().listStatus(new Path(savedCheckpointBasePath), - new FinalizedCheckpointPathFilter()); - if (fileStatusArray == null) { - return -1; - } - Arrays.sort(fileStatusArray); - lastCheckpointedSuperstep = getCheckpoint( - fileStatusArray[fileStatusArray.length - 1].getPath()); - if (LOG.isInfoEnabled()) { - LOG.info("getLastGoodCheckpoint: Found last good checkpoint " + - lastCheckpointedSuperstep + " from " + - fileStatusArray[fileStatusArray.length - 1]. - getPath().toString()); - } + lastCheckpointedSuperstep = getLastCheckpointedSuperstep(); } catch (IOException e) { LOG.fatal("getLastGoodCheckpoint: No last good checkpoints can be " + "found, killing the job.", e); @@ -1306,12 +1290,15 @@ public class BspServiceMaster<I extends WritableComparable, * hostname and id * @param workerInfoList List of the workers to wait for * @param event Event to wait on for a chance to be done. + * @param ignoreDeath In case if worker died after making it through + * barrier, we will ignore death if set to true. * @return True if barrier was successful, false if there was a worker * failure */ private boolean barrierOnWorkerList(String finishedWorkerPath, List<WorkerInfo> workerInfoList, - BspEvent event) { + BspEvent event, + boolean ignoreDeath) { try { getZkExt().createOnceExt(finishedWorkerPath, null, @@ -1339,6 +1326,7 @@ public class BspServiceMaster<I extends WritableComparable, final int defaultTaskTimeoutMsec = 10 * 60 * 1000; // from TaskTracker final int taskTimeoutMsec = getContext().getConfiguration().getInt( "mapred.task.timeout", defaultTaskTimeoutMsec); + List<WorkerInfo> deadWorkers = new ArrayList<>(); while (true) { try { finishedHostnameIdList = @@ -1389,6 +1377,15 @@ public class BspServiceMaster<I extends WritableComparable, break; } + for (WorkerInfo deadWorker : deadWorkers) { + if (!finishedHostnameIdList.contains(deadWorker.getHostnameId())) { + LOG.error("barrierOnWorkerList: no results arived from " + + "worker that was pronounced dead: " + deadWorker + + " on superstep " + getSuperstep()); + return false; + } + } + // Wait for a signal or timeout event.waitMsecs(taskTimeoutMsec / 2); event.reset(); @@ -1396,9 +1393,13 @@ public class BspServiceMaster<I extends WritableComparable, // Did a worker die? try { - if (!superstepChosenWorkerAlive( + deadWorkers.addAll(superstepChosenWorkerAlive( workerInfoHealthyPath, - workerInfoList)) { + workerInfoList)); + if (!ignoreDeath && deadWorkers.size() > 0) { + LOG.error("barrierOnWorkerList: Missing chosen " + + "workers " + deadWorkers + + " on superstep " + getSuperstep()); return false; } } catch (KeeperException e) { @@ -1462,7 +1463,8 @@ public class BspServiceMaster<I extends WritableComparable, String logPrefix = "coordinate" + inputSplitsType + "InputSplits"; if (!barrierOnWorkerList(inputSplitPaths.getDonePath(), chosenWorkerInfoList, - inputSplitEvents.getDoneStateChanged())) { + inputSplitEvents.getDoneStateChanged(), + false)) { throw new IllegalStateException(logPrefix + ": Worker failed during " + "input split (currently not supported)"); } @@ -1589,14 +1591,15 @@ public class BspServiceMaster<I extends WritableComparable, // Finalize the valid checkpoint file prefixes and possibly // the aggregators. - if (checkpointFrequencyMet(getSuperstep())) { + if (checkpointStatus != CheckpointStatus.NONE) { String workerWroteCheckpointPath = getWorkerWroteCheckpointPath(getApplicationAttempt(), getSuperstep()); // first wait for all the workers to write their checkpoint data if (!barrierOnWorkerList(workerWroteCheckpointPath, chosenWorkerInfoList, - getWorkerWroteCheckpointEvent())) { + getWorkerWroteCheckpointEvent(), + checkpointStatus == CheckpointStatus.CHECKPOINT_AND_HALT)) { return SuperstepState.WORKER_FAILURE; } try { @@ -1606,6 +1609,9 @@ public class BspServiceMaster<I extends WritableComparable, "coordinateSuperstep: IOException on finalizing checkpoint", e); } + if (checkpointStatus == CheckpointStatus.CHECKPOINT_AND_HALT) { + return SuperstepState.CHECKPOINT_AND_HALT; + } } if (getSuperstep() == INPUT_SUPERSTEP) { @@ -1630,7 +1636,8 @@ public class BspServiceMaster<I extends WritableComparable, getWorkerFinishedPath(getApplicationAttempt(), getSuperstep()); if (!barrierOnWorkerList(finishedWorkerPath, chosenWorkerInfoList, - getSuperstepStateChangedEvent())) { + getSuperstepStateChangedEvent(), + false)) { return SuperstepState.WORKER_FAILURE; } @@ -1677,10 +1684,14 @@ public class BspServiceMaster<I extends WritableComparable, } getConfiguration().updateSuperstepClasses(superstepClasses); + //Signal workers that we want to checkpoint + checkpointStatus = getCheckpointStatus(getSuperstep() + 1); + globalStats.setCheckpointStatus(checkpointStatus); // Let everyone know the aggregated application state through the // superstep finishing znode. String superstepFinishedNode = getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep()); + WritableUtils.writeToZnode( getZkExt(), superstepFinishedNode, -1, globalStats, superstepClasses); updateCounters(globalStats); @@ -1703,6 +1714,43 @@ public class BspServiceMaster<I extends WritableComparable, } /** + * Should checkpoint on this superstep? If checkpointing, always + * checkpoint the first user superstep. If restarting, the first + * checkpoint is after the frequency has been met. + * + * @param superstep Decide if checkpointing no this superstep + * @return True if this superstep should be checkpointed, false otherwise + */ + private CheckpointStatus getCheckpointStatus(long superstep) { + try { + if (getZkExt(). + exists(basePath + FORCE_CHECKPOINT_USER_FLAG, false) != null) { + return CheckpointStatus.CHECKPOINT_AND_HALT; + } + } catch (KeeperException e) { + throw new IllegalStateException( + "cleanupZooKeeper: Got KeeperException", e); + } catch (InterruptedException e) { + throw new IllegalStateException( + "cleanupZooKeeper: Got IllegalStateException", e); + } + if (checkpointFrequency == 0) { + return CheckpointStatus.NONE; + } + long firstCheckpoint = INPUT_SUPERSTEP + 1; + if (getRestartedSuperstep() != UNSET_SUPERSTEP) { + firstCheckpoint = getRestartedSuperstep() + checkpointFrequency; + } + if (superstep < firstCheckpoint) { + return CheckpointStatus.NONE; + } + if (((superstep - firstCheckpoint) % checkpointFrequency) == 0) { + return CheckpointStatus.CHECKPOINT; + } + return CheckpointStatus.NONE; + } + + /** * This doMasterCompute is only called * after masterCompute is initialized */ @@ -1837,7 +1885,7 @@ public class BspServiceMaster<I extends WritableComparable, } @Override - public void cleanup() throws IOException { + public void cleanup(SuperstepState superstepState) throws IOException { ImmutableClassesGiraphConfiguration conf = getConfiguration(); // All master processes should denote they are done by adding special @@ -1872,7 +1920,8 @@ public class BspServiceMaster<I extends WritableComparable, getGraphTaskManager().setIsMaster(true); cleanUpZooKeeper(); // If desired, cleanup the checkpoint directory - if (GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.get(conf)) { + if (superstepState == SuperstepState.ALL_SUPERSTEPS_DONE && + GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.get(conf)) { boolean success = getFs().delete(new Path(checkpointBasePath), true); if (LOG.isInfoEnabled()) { @@ -1882,6 +1931,12 @@ public class BspServiceMaster<I extends WritableComparable, " succeeded "); } } + if (superstepState == SuperstepState.CHECKPOINT_AND_HALT) { + getFs().create(CheckpointingUtils.getCheckpointMarkPath(conf, + getJobId()), true); + failJob(new Exception("Checkpoint and halt requested. " + + "Killing this job.")); + } aggregatorHandler.close(); masterClient.closeConnections(); masterServer.close(); http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java index 0635210..8e4e0b8 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java @@ -96,6 +96,8 @@ public class MasterThread<I extends WritableComparable, V extends Writable, long initializeMillis = 0; long endMillis = 0; bspServiceMaster.setup(); + SuperstepState superstepState = SuperstepState.INITIAL; + if (bspServiceMaster.becomeMaster()) { // First call to checkWorkers waits for all pending resources. // If these resources are still available at subsequent calls it just @@ -113,11 +115,9 @@ public class MasterThread<I extends WritableComparable, V extends Writable, long setupMillis = System.currentTimeMillis() - initializeMillis; GiraphTimers.getInstance().getSetupMs().increment(setupMillis); setupSecs = setupMillis / 1000.0d; - SuperstepState superstepState = SuperstepState.INITIAL; - long cachedSuperstep = BspService.UNSET_SUPERSTEP; - while (superstepState != SuperstepState.ALL_SUPERSTEPS_DONE) { + while (!superstepState.isExecutionComplete()) { long startSuperstepMillis = System.currentTimeMillis(); - cachedSuperstep = bspServiceMaster.getSuperstep(); + long cachedSuperstep = bspServiceMaster.getSuperstep(); GiraphMetrics.get().resetSuperstepMetrics(cachedSuperstep); Class<? extends Computation> computationClass = bspServiceMaster.getMasterCompute().getComputation(); @@ -153,7 +153,7 @@ public class MasterThread<I extends WritableComparable, V extends Writable, bspServiceMaster.setJobState(ApplicationState.FINISHED, -1, -1); } } - bspServiceMaster.cleanup(); + bspServiceMaster.cleanup(superstepState); if (!superstepSecsMap.isEmpty()) { GiraphTimers.getInstance().getShutdownMs(). increment(System.currentTimeMillis() - endMillis); http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/utils/CheckpointingUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/CheckpointingUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/CheckpointingUtils.java new file mode 100644 index 0000000..11d5e4f --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/utils/CheckpointingUtils.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.utils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +import static org.apache.giraph.conf.GiraphConstants.CHECKPOINT_DIRECTORY; + +/** + * Holds useful functions to get checkpoint paths + * in hdfs. + */ +public class CheckpointingUtils { + + /** + * Do not call constructor. + */ + private CheckpointingUtils() { + } + + /** + * Path to the checkpoint's root (including job id) + * @param conf Immutable configuration of the job + * @param jobId job ID + * @return checkpoint's root + */ + public static String getCheckpointBasePath(Configuration conf, + String jobId) { + return CHECKPOINT_DIRECTORY.getWithDefault(conf, + CHECKPOINT_DIRECTORY.getDefaultValue() + "/" + jobId); + } + + /** + * Path to checkpoint&halt node in hdfs. + * It is set to let client know that master has + * successfully finished checkpointing and job can be restarted. + * @param conf Immutable configuration of the job + * @param jobId job ID + * @return path to checkpoint&halt node in hdfs. + */ + public static Path getCheckpointMarkPath(Configuration conf, + String jobId) { + return new Path(getCheckpointBasePath(conf, jobId), "halt"); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java index 763f59d..3c5cbad 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java @@ -676,4 +676,67 @@ public class WritableUtils { return null; } } + + /** + * Writes a list of Writable objects into output stream. + * This method is trying to optimize space occupied by class information only + * storing class object if it is different from the previous one + * as in most cases arrays tend to have objects of the same type inside. + * @param list serialized object + * @param output the output stream + * @throws IOException + */ + public static void writeList(List<Writable> list, DataOutput output) + throws IOException { + output.writeInt(list.size()); + Class<? extends Writable> clazz = null; + for (Writable element : list) { + output.writeBoolean(element == null); + if (element != null) { + if (element.getClass() != clazz) { + clazz = element.getClass(); + output.writeBoolean(true); + writeClass(clazz, output); + } else { + output.writeBoolean(false); + } + element.write(output); + } + } + } + + /** + * Reads list of Writable objects from data input stream. + * Input stream should have class information along with object data. + * @param input input stream + * @return deserialized list + * @throws IOException + */ + public static List<Writable> readList(DataInput input) throws IOException { + try { + + int size = input.readInt(); + List<Writable> res = new ArrayList<>(size); + Class<? extends Writable> clazz = null; + for (int i = 0; i < size; i++) { + boolean isNull = input.readBoolean(); + if (isNull) { + res.add(null); + } else { + boolean hasClassInfo = input.readBoolean(); + if (hasClassInfo) { + clazz = readClass(input); + } + Writable element = clazz.newInstance(); + element.readFields(input); + res.add(element); + } + } + return res; + + } catch (InstantiationException | IllegalAccessException e) { + throw new IllegalStateException("unable to instantiate object", e); + } + } + } http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index d2d24ee..447bb6f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -21,6 +21,7 @@ package org.apache.giraph.worker; import org.apache.giraph.bsp.ApplicationState; import org.apache.giraph.bsp.BspService; import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.bsp.CheckpointStatus; import org.apache.giraph.comm.ServerData; import org.apache.giraph.comm.WorkerClient; import org.apache.giraph.comm.WorkerClientRequestProcessor; @@ -540,7 +541,8 @@ public class BspServiceWorker<I extends WritableComparable, // 6. Wait for superstep INPUT_SUPERSTEP to complete. if (getRestartedSuperstep() != UNSET_SUPERSTEP) { setCachedSuperstep(getRestartedSuperstep()); - return new FinishedSuperstepStats(0, false, 0, 0, true); + return new FinishedSuperstepStats(0, false, 0, 0, true, + CheckpointStatus.NONE); } JSONObject jobState = getJobState(); @@ -557,7 +559,8 @@ public class BspServiceWorker<I extends WritableComparable, getApplicationAttempt()); } setRestartedSuperstep(getSuperstep()); - return new FinishedSuperstepStats(0, false, 0, 0, true); + return new FinishedSuperstepStats(0, false, 0, 0, true, + CheckpointStatus.NONE); } } catch (JSONException e) { throw new RuntimeException( @@ -946,7 +949,8 @@ public class BspServiceWorker<I extends WritableComparable, globalStats.getHaltComputation(), globalStats.getVertexCount(), globalStats.getEdgeCount(), - false); + false, + globalStats.getCheckpointStatus()); } /** @@ -1314,8 +1318,11 @@ public class BspServiceWorker<I extends WritableComparable, throws IOException, InterruptedException { workerClient.closeConnections(); setCachedSuperstep(getSuperstep() - 1); - saveVertices(finishedSuperstepStats.getLocalVertexCount()); - saveEdges(); + if (finishedSuperstepStats.getCheckpointStatus() != + CheckpointStatus.CHECKPOINT_AND_HALT) { + saveVertices(finishedSuperstepStats.getLocalVertexCount()); + saveEdges(); + } WorkerProgress.get().finishStoring(); if (workerProgressWriter != null) { workerProgressWriter.stop(); @@ -1414,6 +1421,10 @@ public class BspServiceWorker<I extends WritableComparable, } + List<Writable> w2wMessages = + getServerData().getCurrentWorkerToWorkerMessages(); + WritableUtils.writeList(w2wMessages, checkpointOutputStream); + checkpointOutputStream.close(); getFs().createNewFile(validFilePath); @@ -1488,9 +1499,9 @@ public class BspServiceWorker<I extends WritableComparable, final CompressionCodec codec = new CompressionCodecFactory(getConfiguration()) - .getCodecByClassName( + .getCodec(new Path( GiraphConstants.CHECKPOINT_COMPRESSION_CODEC - .get(getConfiguration())); + .get(getConfiguration()))); long t0 = System.currentTimeMillis(); @@ -1559,9 +1570,9 @@ public class BspServiceWorker<I extends WritableComparable, final CompressionCodec codec = new CompressionCodecFactory(getConfiguration()) - .getCodecByClassName( + .getCodec(new Path( GiraphConstants.CHECKPOINT_COMPRESSION_CODEC - .get(getConfiguration())); + .get(getConfiguration()))); long t0 = System.currentTimeMillis(); @@ -1660,6 +1671,10 @@ public class BspServiceWorker<I extends WritableComparable, getServerData().getCurrentMessageStore().readFieldsForPartition( checkpointStream, partitionId); } + + List<Writable> w2wMessages = WritableUtils.readList(checkpointStream); + getServerData().getCurrentWorkerToWorkerMessages().addAll(w2wMessages); + checkpointStream.close(); if (LOG.isInfoEnabled()) { @@ -1920,4 +1935,18 @@ else[HADOOP_NON_SECURE]*/ public SuperstepOutput<I, V, E> getSuperstepOutput() { return superstepOutput; } + + @Override + public GlobalStats getGlobalStats() { + GlobalStats globalStats = new GlobalStats(); + if (getSuperstep() > Math.max(INPUT_SUPERSTEP, getRestartedSuperstep())) { + String superstepFinishedNode = + getSuperstepFinishedPath(getApplicationAttempt(), + getSuperstep() - 1); + WritableUtils.readFieldsFromZnode( + getZkExt(), superstepFinishedNode, false, null, + globalStats); + } + return globalStats; + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-core/src/test/java/org/apache/giraph/utils/TestWritableUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/utils/TestWritableUtils.java b/giraph-core/src/test/java/org/apache/giraph/utils/TestWritableUtils.java new file mode 100644 index 0000000..c712b5a --- /dev/null +++ b/giraph-core/src/test/java/org/apache/giraph/utils/TestWritableUtils.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.utils; + +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Test case for WritableUtils. + */ +public class TestWritableUtils { + + /** + * Tests readList and writeList functions in writable utils. + * @throws IOException + */ + @Test + public void testListSerialization() throws IOException { + List<Writable> list = new ArrayList<>(); + list.add(new LongWritable(1)); + list.add(new LongWritable(2)); + list.add(null); + list.add(new FloatWritable(3)); + list.add(new FloatWritable(4)); + list.add(new LongWritable(5)); + list.add(new LongWritable(6)); + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + WritableUtils.writeList(list, dos); + dos.close(); + + byte[] data = bos.toByteArray(); + + DataInputStream input = + new DataInputStream(new ByteArrayInputStream(data)); + + List<Writable> result = WritableUtils.readList(input); + + Assert.assertEquals(list, result); + + } + +} http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/giraph-examples/src/test/java/org/apache/giraph/TestCheckpointing.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestCheckpointing.java b/giraph-examples/src/test/java/org/apache/giraph/TestCheckpointing.java index 2939af7..9502557 100644 --- a/giraph-examples/src/test/java/org/apache/giraph/TestCheckpointing.java +++ b/giraph-examples/src/test/java/org/apache/giraph/TestCheckpointing.java @@ -19,8 +19,10 @@ package org.apache.giraph; import org.apache.giraph.aggregators.LongSumAggregator; +import org.apache.giraph.bsp.BspService; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.edge.Edge; import org.apache.giraph.edge.EdgeFactory; import org.apache.giraph.examples.SimpleSuperstepComputation; @@ -29,17 +31,25 @@ import org.apache.giraph.graph.Vertex; import org.apache.giraph.job.GiraphJob; import org.apache.giraph.master.DefaultMasterCompute; import org.apache.giraph.worker.DefaultWorkerContext; +import org.apache.giraph.zk.ZooKeeperExt; +import org.apache.giraph.zk.ZooKeeperManager; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; import org.apache.log4j.Logger; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; +import org.junit.Assert; import org.junit.Test; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.List; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -56,13 +66,8 @@ public class TestCheckpointing extends BspCase { Logger.getLogger(TestCheckpointing.class); /** ID to be used with test job */ public static final String TEST_JOB_ID = "test_job"; - /** - * Compute will double check that we don't run supersteps - * lesser than specified by this key. That way we ensure that - * computation actually restarted and not recalculated from the - * beginning. - */ - public static final String KEY_MIN_SUPERSTEP = "minimum.superstep"; + + private static SuperstepCallback SUPERSTEP_CALLBACK; /** * Create the test case @@ -84,49 +89,45 @@ public class TestCheckpointing extends BspCase { public void testBspCheckpoint(boolean useAsyncMessageStore) throws IOException, InterruptedException, ClassNotFoundException { Path checkpointsDir = getTempPath("checkpointing"); - Path outputPath = getTempPath(getCallingMethodName()); GiraphConfiguration conf = new GiraphConfiguration(); - conf.setComputationClass( - CheckpointComputation.class); - conf.setWorkerContextClass( - CheckpointVertexWorkerContext.class); - conf.setMasterComputeClass( - CheckpointVertexMasterCompute.class); - conf.setVertexInputFormatClass(SimpleSuperstepComputation.SimpleSuperstepVertexInputFormat.class); - conf.setVertexOutputFormatClass(SimpleSuperstepComputation.SimpleSuperstepVertexOutputFormat.class); - conf.set("mapred.job.id", TEST_JOB_ID); - conf.set(KEY_MIN_SUPERSTEP, "0"); if (useAsyncMessageStore) { GiraphConstants.ASYNC_MESSAGE_STORE_THREADS_COUNT.set(conf, 2); } - GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath); - GiraphConfiguration configuration = job.getConfiguration(); - GiraphConstants.CHECKPOINT_DIRECTORY.set(configuration, checkpointsDir.toString()); - GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.set(configuration, false); - configuration.setCheckpointFrequency(2); + SUPERSTEP_CALLBACK = null; - assertTrue(job.run(true)); + GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.set(conf, false); + conf.setCheckpointFrequency(2); - long idSum = 0; - if (!runningInDistributedMode()) { - FileStatus fileStatus = getSinglePartFileStatus(job.getConfiguration(), - outputPath); - idSum = CheckpointVertexWorkerContext - .getFinalSum(); - LOG.info("testBspCheckpoint: idSum = " + idSum + - " fileLen = " + fileStatus.getLen()); - } + long idSum = runOriginalJob(checkpointsDir, conf); + assertEquals(10, idSum); + + SUPERSTEP_CALLBACK = new SuperstepCallback() { + @Override + public void superstep(long superstep, + ImmutableClassesGiraphConfiguration<LongWritable, IntWritable, FloatWritable> conf) { + if (superstep < 2) { + Assert.fail("Restarted JOB should not be executed on superstep " + superstep); + } + } + }; + + runRestartedJob(checkpointsDir, conf, idSum, 2); + + + } - // Restart the test from superstep 2 - LOG.info("testBspCheckpoint: Restarting from superstep 2" + - " with checkpoint path = " + checkpointsDir); + private void runRestartedJob(Path checkpointsDir, GiraphConfiguration conf, long idSum, long restartFrom) throws IOException, InterruptedException, ClassNotFoundException { + Path outputPath; + LOG.info("testBspCheckpoint: Restarting from the latest superstep " + + "with checkpoint path = " + checkpointsDir); outputPath = getTempPath("checkpointing_restarted"); GiraphConstants.RESTART_JOB_ID.set(conf, TEST_JOB_ID); conf.set("mapred.job.id", "restarted_test_job"); - conf.set(GiraphConstants.RESTART_SUPERSTEP, "2"); - conf.set(KEY_MIN_SUPERSTEP, "2"); + if (restartFrom >= 0) { + conf.set(GiraphConstants.RESTART_SUPERSTEP, Long.toString(restartFrom)); + } GiraphJob restartedJob = prepareJob(getCallingMethodName() + "Restarted", conf, outputPath); @@ -135,6 +136,8 @@ public class TestCheckpointing extends BspCase { checkpointsDir.toString()); assertTrue(restartedJob.run(true)); + + if (!runningInDistributedMode()) { long idSumRestarted = CheckpointVertexWorkerContext @@ -145,6 +148,36 @@ public class TestCheckpointing extends BspCase { } } + private long runOriginalJob(Path checkpointsDir, GiraphConfiguration conf) throws IOException, InterruptedException, ClassNotFoundException { + Path outputPath = getTempPath("checkpointing_original"); + conf.setComputationClass( + CheckpointComputation.class); + conf.setWorkerContextClass( + CheckpointVertexWorkerContext.class); + conf.setMasterComputeClass( + CheckpointVertexMasterCompute.class); + conf.setVertexInputFormatClass(SimpleSuperstepComputation.SimpleSuperstepVertexInputFormat.class); + conf.setVertexOutputFormatClass(SimpleSuperstepComputation.SimpleSuperstepVertexOutputFormat.class); + conf.set("mapred.job.id", TEST_JOB_ID); + GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath); + + GiraphConfiguration configuration = job.getConfiguration(); + GiraphConstants.CHECKPOINT_DIRECTORY.set(configuration, checkpointsDir.toString()); + + assertTrue(job.run(true)); + + long idSum = 0; + if (!runningInDistributedMode()) { + FileStatus fileStatus = getSinglePartFileStatus(job.getConfiguration(), + outputPath); + idSum = CheckpointVertexWorkerContext + .getFinalSum(); + LOG.info("testBspCheckpoint: idSum = " + idSum + + " fileLen = " + fileStatus.getLen()); + } + return idSum; + } + /** * Actual computation. @@ -159,10 +192,6 @@ public class TestCheckpointing extends BspCase { CheckpointVertexWorkerContext workerContext = getWorkerContext(); assertEquals(getSuperstep() + 1, workerContext.testValue); - if (getSuperstep() < getConf().getInt(KEY_MIN_SUPERSTEP, Integer.MAX_VALUE)){ - fail("Should not be running compute on superstep " + getSuperstep()); - } - if (getSuperstep() > 4) { vertex.voteToHalt(); return; @@ -186,10 +215,76 @@ public class TestCheckpointing extends BspCase { EdgeFactory.create(edge.getTargetVertexId(), newEdgeValue); vertex.addEdge(newEdge); sendMessage(edge.getTargetVertexId(), newEdgeValue); + } } } + @Test + public void testManualCheckpointAtTheBeginning() + throws InterruptedException, IOException, ClassNotFoundException { + testManualCheckpoint(0); + } + + @Test + public void testManualCheckpoint() + throws InterruptedException, IOException, ClassNotFoundException { + testManualCheckpoint(2); + } + + + private void testManualCheckpoint(final int checkpointSuperstep) + throws IOException, InterruptedException, ClassNotFoundException { + Path checkpointsDir = getTempPath("checkpointing"); + GiraphConfiguration conf = new GiraphConfiguration(); + + SUPERSTEP_CALLBACK = new SuperstepCallback() { + + @Override + public void superstep(long superstep, ImmutableClassesGiraphConfiguration<LongWritable, IntWritable, FloatWritable> conf) { + if (superstep == checkpointSuperstep) { + try { + ZooKeeperExt zooKeeperExt = new ZooKeeperExt(conf.getZookeeperList(), + conf.getZooKeeperSessionTimeout(), + conf.getZookeeperOpsMaxAttempts(), + conf.getZookeeperOpsRetryWaitMsecs(), + TestCheckpointing.this); + String basePath = ZooKeeperManager.getBasePath(conf) + BspService.BASE_DIR + "/" + conf.get("mapred.job.id"); + zooKeeperExt.createExt( + basePath + BspService.FORCE_CHECKPOINT_USER_FLAG, + null, + ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT, + true); + } catch (IOException | InterruptedException | KeeperException e) { + throw new RuntimeException(e); + } + } else if (superstep > checkpointSuperstep) { + Assert.fail("Job should be stopped by now " + superstep); + } + } + }; + + try { + runOriginalJob(checkpointsDir, conf); + fail("Original job should fail after checkpointing"); + } catch (Exception e) { + LOG.info("Original job failed, that's OK " + e); + } + + SUPERSTEP_CALLBACK = new SuperstepCallback() { + @Override + public void superstep(long superstep, + ImmutableClassesGiraphConfiguration<LongWritable, IntWritable, FloatWritable> conf) { + if (superstep < checkpointSuperstep) { + Assert.fail("Restarted JOB should not be executed on superstep " + superstep); + } + } + }; + + runRestartedJob(checkpointsDir, conf, 10, -1); + } + /** * Worker context associated. */ @@ -205,6 +300,21 @@ public class TestCheckpointing extends BspCase { } @Override + public void postSuperstep() { + super.postSuperstep(); + sendMessageToMyself(new LongWritable(getSuperstep())); + } + + /** + * Send message to all workers (except this worker) + * + * @param message Message to send + */ + private void sendMessageToMyself(Writable message) { + sendMessageToWorker(message, getMyWorkerIndex()); + } + + @Override public void postApplication() { setFinalSum(this.<LongWritable>getAggregatedValue( LongSumAggregator.class.getName()).get()); @@ -223,6 +333,11 @@ public class TestCheckpointing extends BspCase { @Override public void preSuperstep() { assertEquals(getSuperstep(), testValue++); + if (getSuperstep() > 0) { + List<Writable> messages = getAndClearMessagesFromOtherWorkers(); + assertEquals(1, messages.size()); + assertEquals(getSuperstep() - 1, ((LongWritable)(messages.get(0))).get()); + } } @Override @@ -249,6 +364,9 @@ public class TestCheckpointing extends BspCase { @Override public void compute() { long superstep = getSuperstep(); + if (SUPERSTEP_CALLBACK != null) { + SUPERSTEP_CALLBACK.superstep(getSuperstep(), getConf()); + } assertEquals(superstep, testValue++); } @@ -272,6 +390,12 @@ public class TestCheckpointing extends BspCase { } } + private static interface SuperstepCallback { + public void superstep(long superstep, + ImmutableClassesGiraphConfiguration<LongWritable, + IntWritable, FloatWritable> conf); + + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/5adca63d/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 672ec44..b4d78ae 100644 --- a/pom.xml +++ b/pom.xml @@ -1059,7 +1059,7 @@ under the License. </modules> <properties> <hadoop.version>0.20.0</hadoop.version> - <munge.symbols>HADOOP_NON_SECURE,HADOOP_NON_JOBCONTEXT_IS_INTERFACE</munge.symbols> + <munge.symbols>HADOOP_NON_SECURE,HADOOP_NON_JOBCONTEXT_IS_INTERFACE,HADOOP_JOB_ID_AVAILABLE</munge.symbols> </properties> <dependencies> <!-- sorted lexicographically -->
