GIRAPH-924: Fix checkpointing (edunov via majakabiljo)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/02d9e6c2 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/02d9e6c2 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/02d9e6c2 Branch: refs/heads/release-1.1 Commit: 02d9e6c2533a3cd108f5e6feaf40f26e95deb64c Parents: 0a90177 Author: Maja Kabiljo <[email protected]> Authored: Wed Jul 16 10:30:30 2014 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Wed Jul 16 10:30:30 2014 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 + .../giraph/aggregators/AggregatorWrapper.java | 6 +- .../java/org/apache/giraph/bsp/BspService.java | 24 ++ .../java/org/apache/giraph/comm/ServerData.java | 17 + .../org/apache/giraph/conf/GiraphConstants.java | 19 + .../apache/giraph/master/BspServiceMaster.java | 179 ++++---- .../giraph/partition/BasicPartitionOwner.java | 10 - .../giraph/partition/HashMasterPartitioner.java | 6 + .../giraph/partition/HashWorkerPartitioner.java | 5 +- .../partition/MasterGraphPartitioner.java | 7 + .../giraph/partition/PartitionBalancer.java | 4 +- .../apache/giraph/partition/PartitionOwner.java | 16 - .../partition/SimpleMasterPartitioner.java | 6 + .../partition/SimpleWorkerPartitioner.java | 6 +- .../partition/WorkerGraphPartitioner.java | 5 +- .../giraph/utils/InternalVertexRunner.java | 260 ++++++++---- .../utils/io/ExtendedDataInputOutput.java | 2 +- .../apache/giraph/worker/BspServiceWorker.java | 404 ++++++++++++------- .../org/apache/giraph/worker/WorkerContext.java | 16 +- .../SimpleRangePartitionFactoryTest.java | 2 +- .../org/apache/giraph/TestCheckpointing.java | 266 ++++++++++++ 21 files changed, 882 insertions(+), 380 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 7287490..4207339 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-924: Fix checkpointing (edunov via majakabiljo) + GIRAPH-921: Create ByteValueVertex to store vertex values as bytes without object instance (akyrola via majakabiljo) GIRAPH-929: setIfUnset for EnumConfOption (pavanka) http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/aggregators/AggregatorWrapper.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/AggregatorWrapper.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/AggregatorWrapper.java index 9613805..7150402 100644 --- a/giraph-core/src/main/java/org/apache/giraph/aggregators/AggregatorWrapper.java +++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/AggregatorWrapper.java @@ -44,13 +44,9 @@ public class AggregatorWrapper<A extends Writable> { * @param persistent False iff aggregator should be reset at the end of * each super step * @param conf Configuration - * @throws IllegalAccessException - * @throws InstantiationException */ public AggregatorWrapper(Class<? extends Aggregator<A>> aggregatorClass, - boolean persistent, ImmutableClassesGiraphConfiguration conf) throws - IllegalAccessException, - InstantiationException { + boolean persistent, ImmutableClassesGiraphConfiguration conf) { this.persistent = persistent; currentAggregator = ReflectionUtils.newInstance(aggregatorClass, conf); changed = false; http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java index 2e35373..02577b9 100644 --- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java @@ -54,6 +54,7 @@ import java.util.Collections; import java.util.List; import static org.apache.giraph.conf.GiraphConstants.CHECKPOINT_DIRECTORY; +import static org.apache.giraph.conf.GiraphConstants.RESTART_JOB_ID; /** * Zookeeper-based implementation of {@link CentralizedService}. @@ -198,6 +199,11 @@ public abstract class BspService<I extends WritableComparable, */ public static final String CHECKPOINT_VALID_POSTFIX = ".valid"; /** + * If at the end of a checkpoint file, + * indicates that we store WorkerContext and aggregator handler data. + */ + public static final String CHECKPOINT_DATA_POSTFIX = ".data"; + /** * If at the end of a checkpoint file, indicates the stitched checkpoint * file prefixes. A checkpoint is not valid if this file does not exist. */ @@ -226,6 +232,8 @@ public abstract class BspService<I extends WritableComparable, protected final String cleanedUpPath; /** Path to the checkpoint's root (including job id) */ protected final String checkpointBasePath; + /** Old checkpoint in case we want to restart some job */ + protected final String savedCheckpointBasePath; /** Path to the master election path */ protected final String masterElectionPath; /** Stores progress info of this worker */ @@ -350,6 +358,12 @@ public abstract class BspService<I extends WritableComparable, EDGE_INPUT_SPLITS_ALL_READY_NODE, EDGE_INPUT_SPLITS_ALL_DONE_NODE); applicationAttemptsPath = basePath + APPLICATION_ATTEMPTS_DIR; cleanedUpPath = basePath + CLEANED_UP_DIR; + + String restartJobId = RESTART_JOB_ID.get(conf); + savedCheckpointBasePath = + CHECKPOINT_DIRECTORY.getWithDefault(getConfiguration(), + CHECKPOINT_DIRECTORY.getDefaultValue() + "/" + + (restartJobId == null ? getJobId() : restartJobId)); checkpointBasePath = CHECKPOINT_DIRECTORY.getWithDefault(getConfiguration(), CHECKPOINT_DIRECTORY.getDefaultValue() + "/" + getJobId()); @@ -575,6 +589,16 @@ public abstract class BspService<I extends WritableComparable, } /** + * In case when we restart another job this will give us a path + * to saved checkpoint. + * @param superstep superstep to use + * @return Direcory path for restarted job based on the superstep + */ + public final String getSavedCheckpointBasePath(long superstep) { + return savedCheckpointBasePath + "/" + superstep; + } + + /** * Get the checkpoint from a finalized checkpoint path * * @param finalizedPath Path of the finalized checkpoint http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java index 85bfe04..036510e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java @@ -161,6 +161,23 @@ public class ServerData<I extends WritableComparable, return (MessageStore<I, M>) currentMessageStore; } + /** + * Re-initialize message stores. + * Discards old values if any. + * @throws IOException + */ + public void resetMessageStores() throws IOException { + if (currentMessageStore != null) { + currentMessageStore.clearAll(); + currentMessageStore = null; + } + if (incomingMessageStore != null) { + incomingMessageStore.clearAll(); + incomingMessageStore = null; + } + prepareSuperstep(); + } + /** Prepare for next super step */ public void prepareSuperstep() { if (currentMessageStore != null) { http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index 81c0e0b..3d16e9c 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -880,6 +880,13 @@ public interface GiraphConstants { String RESTART_SUPERSTEP = "giraph.restartSuperstep"; /** + * If application is restarted manually we need to specify job ID + * to restart from. + */ + StrConfOption RESTART_JOB_ID = new StrConfOption("giraph.restart.jobId", + null, "Which job ID should I try to restart?"); + + /** * Base ZNode for Giraph's state in the ZooKeeper cluster. Must be a root * znode on the cluster beginning with "/" */ @@ -1124,5 +1131,17 @@ public interface GiraphConstants { IntConfOption HDFS_FILE_CREATION_RETRY_WAIT_MS = new IntConfOption("giraph.hdfs.file.creation.retry.wait.ms", 30_000, "Milliseconds to wait prior to retrying creation of an HDFS file"); + + /** Number of threads for writing and reading checkpoints */ + IntConfOption NUM_CHECKPOINT_IO_THREADS = + new IntConfOption("giraph.checkpoint.io.threads", 8, + "Number of threads for writing and reading checkpoints"); + + /** Compression algorithm to be used for checkpointing */ + StrConfOption CHECKPOINT_COMPRESSION_CODEC = + new StrConfOption("giraph.checkpoint.compression.codec", + "org.apache.hadoop.io.compress.DefaultCodec", + "Defines compression algorithm we will be using for " + + "storing checkpoint"); } // CHECKSTYLE: resume InterfaceIsTypeCheck http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java index 0275395..e129390 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -18,6 +18,8 @@ package org.apache.giraph.master; +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import org.apache.commons.io.FilenameUtils; import org.apache.giraph.bsp.ApplicationState; import org.apache.giraph.bsp.BspInputFormat; @@ -42,6 +44,7 @@ import org.apache.giraph.io.GiraphInputFormat; import org.apache.giraph.graph.GraphTaskManager; import org.apache.giraph.io.MappingInputFormat; import org.apache.giraph.io.VertexInputFormat; +import org.apache.giraph.partition.BasicPartitionOwner; import org.apache.giraph.partition.MasterGraphPartitioner; import org.apache.giraph.partition.PartitionOwner; import org.apache.giraph.partition.PartitionStats; @@ -100,10 +103,8 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.Callable; @@ -272,6 +273,7 @@ public class BspServiceMaster<I extends WritableComparable, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL, true); + LOG.info("setJobState: " + jobState); } catch (KeeperException.NodeExistsException e) { throw new IllegalStateException( "setJobState: Imposible that " + @@ -740,20 +742,18 @@ public class BspServiceMaster<I extends WritableComparable, * finalized checkpoint file and setting it. * * @param superstep Checkpoint set to examine. - * @param partitionOwners Partition owners to modify with checkpoint - * prefixes * @throws IOException * @throws InterruptedException * @throws KeeperException + * @return Collection of generated partition owners. */ - private void prepareCheckpointRestart( - long superstep, - Collection<PartitionOwner> partitionOwners) + private Collection<PartitionOwner> prepareCheckpointRestart(long superstep) throws IOException, KeeperException, InterruptedException { + List<PartitionOwner> partitionOwners = new ArrayList<>(); FileSystem fs = getFs(); - List<Path> validMetadataPathList = new ArrayList<Path>(); String finalizedCheckpointPath = - getCheckpointBasePath(superstep) + CHECKPOINT_FINALIZED_POSTFIX; + getSavedCheckpointBasePath(superstep) + CHECKPOINT_FINALIZED_POSTFIX; + LOG.info("Loading checkpoint from " + finalizedCheckpointPath); DataInputStream finalizedStream = fs.open(new Path(finalizedCheckpointPath)); GlobalStats globalStats = new GlobalStats(); @@ -763,51 +763,46 @@ public class BspServiceMaster<I extends WritableComparable, superstepClasses.readFields(finalizedStream); getConfiguration().updateSuperstepClasses(superstepClasses); int prefixFileCount = finalizedStream.readInt(); - for (int i = 0; i < prefixFileCount; ++i) { - String metadataFilePath = - finalizedStream.readUTF() + CHECKPOINT_METADATA_POSTFIX; - validMetadataPathList.add(new Path(metadataFilePath)); - } - aggregatorHandler.readFields(finalizedStream); - masterCompute.readFields(finalizedStream); - finalizedStream.close(); - Map<Integer, PartitionOwner> idOwnerMap = - new HashMap<Integer, PartitionOwner>(); - for (PartitionOwner partitionOwner : partitionOwners) { - if (idOwnerMap.put(partitionOwner.getPartitionId(), - partitionOwner) != null) { - throw new IllegalStateException( - "prepareCheckpointRestart: Duplicate partition " + - partitionOwner); - } + Int2ObjectMap<WorkerInfo> workersMap = new Int2ObjectOpenHashMap<>(); + for (WorkerInfo worker : chosenWorkerInfoList) { + workersMap.put(worker.getTaskId(), worker); } - // Reading the metadata files. Simply assign each partition owner - // the correct file prefix based on the partition id. - for (Path metadataPath : validMetadataPathList) { - String checkpointFilePrefix = metadataPath.toString(); - checkpointFilePrefix = - checkpointFilePrefix.substring( - 0, - checkpointFilePrefix.length() - - CHECKPOINT_METADATA_POSTFIX.length()); - DataInputStream metadataStream = fs.open(metadataPath); + String checkpointFile = + finalizedStream.readUTF(); + for (int i = 0; i < prefixFileCount; ++i) { + int mrTaskId = finalizedStream.readInt(); + + DataInputStream metadataStream = fs.open(new Path(checkpointFile + + "." + mrTaskId + CHECKPOINT_METADATA_POSTFIX)); long partitions = metadataStream.readInt(); - for (long i = 0; i < partitions; ++i) { - long dataPos = metadataStream.readLong(); + WorkerInfo worker = workersMap.get(mrTaskId); + for (long p = 0; p < partitions; ++p) { int partitionId = metadataStream.readInt(); - PartitionOwner partitionOwner = idOwnerMap.get(partitionId); - if (LOG.isInfoEnabled()) { - LOG.info("prepareSuperstepRestart: File " + metadataPath + - " with position " + dataPos + - ", partition id = " + partitionId + - " assigned to " + partitionOwner); - } - partitionOwner.setCheckpointFilesPrefix(checkpointFilePrefix); + PartitionOwner partitionOwner = new BasicPartitionOwner(partitionId, + worker); + partitionOwners.add(partitionOwner); + LOG.info("prepareCheckpointRestart partitionId=" + partitionId + + " assigned to " + partitionOwner); } metadataStream.close(); } + //Ordering appears to be important as of right now we rely on this ordering + //in WorkerGraphPartitioner + Collections.sort(partitionOwners, new Comparator<PartitionOwner>() { + @Override + public int compare(PartitionOwner p1, PartitionOwner p2) { + return Integer.compare(p1.getPartitionId(), p2.getPartitionId()); + } + }); + + + aggregatorHandler.readFields(finalizedStream); + masterCompute.readFields(finalizedStream); + finalizedStream.close(); + + return partitionOwners; } @Override @@ -1085,11 +1080,9 @@ public class BspServiceMaster<I extends WritableComparable, getZkExt().getData(superstepFinishedNode, false, null)); finalizedOutputStream.writeInt(chosenWorkerInfoList.size()); + finalizedOutputStream.writeUTF(getCheckpointBasePath(superstep)); for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) { - String chosenWorkerInfoPrefix = - getCheckpointBasePath(superstep) + "." + - chosenWorkerInfo.getHostnameId(); - finalizedOutputStream.writeUTF(chosenWorkerInfoPrefix); + finalizedOutputStream.writeInt(chosenWorkerInfo.getTaskId()); } aggregatorHandler.write(finalizedOutputStream); masterCompute.write(finalizedOutputStream); @@ -1104,18 +1097,10 @@ public class BspServiceMaster<I extends WritableComparable, * the workers will know how to do the exchange. If this was a restarted * superstep, then make sure to provide information on where to find the * checkpoint file. - * - * @param allPartitionStatsList All partition stats - * @param chosenWorkerInfoList All the chosen worker infos - * @param masterGraphPartitioner Master graph partitioner */ - private void assignPartitionOwners( - List<PartitionStats> allPartitionStatsList, - List<WorkerInfo> chosenWorkerInfoList, - MasterGraphPartitioner<I, V, E> masterGraphPartitioner) { + private void assignPartitionOwners() { Collection<PartitionOwner> partitionOwners; - if (getSuperstep() == INPUT_SUPERSTEP || - getSuperstep() == getRestartedSuperstep()) { + if (getSuperstep() == INPUT_SUPERSTEP) { partitionOwners = masterGraphPartitioner.createInitialPartitionOwners( chosenWorkerInfoList, maxWorkers); @@ -1123,23 +1108,10 @@ public class BspServiceMaster<I extends WritableComparable, throw new IllegalStateException( "assignAndExchangePartitions: No partition owners set"); } - } else { - partitionOwners = - masterGraphPartitioner.generateChangedPartitionOwners( - allPartitionStatsList, - chosenWorkerInfoList, - maxWorkers, - getSuperstep()); - - PartitionUtils.analyzePartitionStats(partitionOwners, - allPartitionStatsList); - } - checkPartitions(masterGraphPartitioner.getCurrentPartitionOwners()); - - // If restarted, prepare the checkpoint restart - if (getRestartedSuperstep() == getSuperstep()) { + } else if (getRestartedSuperstep() == getSuperstep()) { + // If restarted, prepare the checkpoint restart try { - prepareCheckpointRestart(getSuperstep(), partitionOwners); + partitionOwners = prepareCheckpointRestart(getSuperstep()); } catch (IOException e) { throw new IllegalStateException( "assignPartitionOwners: IOException on preparing", e); @@ -1151,7 +1123,21 @@ public class BspServiceMaster<I extends WritableComparable, "assignPartitionOwners: InteruptedException on preparing", e); } + masterGraphPartitioner.setPartitionOwners(partitionOwners); + } else { + partitionOwners = + masterGraphPartitioner.generateChangedPartitionOwners( + allPartitionStatsList, + chosenWorkerInfoList, + maxWorkers, + getSuperstep()); + + PartitionUtils.analyzePartitionStats(partitionOwners, + allPartitionStatsList); } + checkPartitions(masterGraphPartitioner.getCurrentPartitionOwners()); + + // There will be some exchange of partitions if (!partitionOwners.isEmpty()) { @@ -1240,18 +1226,9 @@ public class BspServiceMaster<I extends WritableComparable, // 1. Remove all old input split data // 2. Increase the application attempt and set to the correct checkpoint // 3. Send command to all workers to restart their tasks - try { - getZkExt().deleteExt(vertexInputSplitsPaths.getPath(), -1, - true); - getZkExt().deleteExt(edgeInputSplitsPaths.getPath(), -1, - true); - } catch (InterruptedException e) { - throw new RuntimeException( - "restartFromCheckpoint: InterruptedException", e); - } catch (KeeperException e) { - throw new RuntimeException( - "restartFromCheckpoint: KeeperException", e); - } + zkDeleteNode(vertexInputSplitsPaths.getPath()); + zkDeleteNode(edgeInputSplitsPaths.getPath()); + setApplicationAttempt(getApplicationAttempt() + 1); setCachedSuperstep(checkpoint); setRestartedSuperstep(checkpoint); @@ -1261,6 +1238,26 @@ public class BspServiceMaster<I extends WritableComparable, } /** + * Safely removes node from zookeeper. + * Ignores if node is already removed. Can only throw runtime exception if + * anything wrong. + * @param path path to the node to be removed. + */ + private void zkDeleteNode(String path) { + try { + getZkExt().deleteExt(path, -1, true); + } catch (KeeperException.NoNodeException e) { + LOG.info("zkDeleteNode: node has already been removed " + path); + } catch (InterruptedException e) { + throw new RuntimeException( + "zkDeleteNode: InterruptedException", e); + } catch (KeeperException e) { + throw new RuntimeException( + "zkDeleteNode: KeeperException", e); + } + } + + /** * Only get the finalized checkpoint files */ public static class FinalizedCheckpointPathFilter implements PathFilter { @@ -1277,7 +1274,7 @@ public class BspServiceMaster<I extends WritableComparable, if (lastCheckpointedSuperstep == -1) { try { FileStatus[] fileStatusArray = - getFs().listStatus(new Path(checkpointBasePath), + getFs().listStatus(new Path(savedCheckpointBasePath), new FinalizedCheckpointPathFilter()); if (fileStatusArray == null) { return -1; @@ -1582,9 +1579,7 @@ public class BspServiceMaster<I extends WritableComparable, GiraphStats.getInstance(). getCurrentWorkers().setValue(chosenWorkerInfoList.size()); - assignPartitionOwners(allPartitionStatsList, - chosenWorkerInfoList, - masterGraphPartitioner); + assignPartitionOwners(); // We need to finalize aggregators from previous superstep (send them to // worker owners) after new worker assignments http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartitionOwner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartitionOwner.java b/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartitionOwner.java index 545d1af..b6cf813 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartitionOwner.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartitionOwner.java @@ -103,16 +103,6 @@ public class BasicPartitionOwner implements PartitionOwner, } @Override - public String getCheckpointFilesPrefix() { - return checkpointFilesPrefix; - } - - @Override - public void setCheckpointFilesPrefix(String checkpointFilesPrefix) { - this.checkpointFilesPrefix = checkpointFilesPrefix; - } - - @Override public void writeWithWorkerIds(DataOutput output) throws IOException { output.writeInt(partitionId); output.writeInt(workerInfo.getTaskId()); http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java index 240687e..caede8c 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java @@ -18,6 +18,7 @@ package org.apache.giraph.partition; +import com.google.common.collect.Lists; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.worker.WorkerInfo; import org.apache.hadoop.io.Writable; @@ -75,6 +76,11 @@ public class HashMasterPartitioner<I extends WritableComparable, } @Override + public void setPartitionOwners(Collection<PartitionOwner> partitionOwners) { + this.partitionOwnerList = Lists.newArrayList(partitionOwners); + } + + @Override public Collection<PartitionOwner> getCurrentPartitionOwners() { return partitionOwnerList; } http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java index d833895..12aa417 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java @@ -65,10 +65,9 @@ public class HashWorkerPartitioner<I extends WritableComparable, @Override public PartitionExchange updatePartitionOwners( WorkerInfo myWorkerInfo, - Collection<? extends PartitionOwner> masterSetPartitionOwners, - PartitionStore<I, V, E> partitionStore) { + Collection<? extends PartitionOwner> masterSetPartitionOwners) { return PartitionBalancer.updatePartitionOwners(partitionOwnerList, - myWorkerInfo, masterSetPartitionOwners, partitionStore); + myWorkerInfo, masterSetPartitionOwners); } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/partition/MasterGraphPartitioner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/MasterGraphPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/MasterGraphPartitioner.java index 50c750a..d2363fb 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/MasterGraphPartitioner.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/MasterGraphPartitioner.java @@ -47,6 +47,13 @@ public interface MasterGraphPartitioner<I extends WritableComparable, Collection<WorkerInfo> availableWorkerInfos, int maxWorkers); /** + * Sets partition owners for the graph. + * Used then loading from checkpoint. + * @param partitionOwners assigned partition owners. + */ + void setPartitionOwners(Collection<PartitionOwner> partitionOwners); + + /** * After the worker stats have been merged to a single list, the master can * use this information to send commands to the workers for any * {@link Partition} changes. This protocol is specific to the http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/partition/PartitionBalancer.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionBalancer.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionBalancer.java index 3454d62..0d8f3cf 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionBalancer.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionBalancer.java @@ -309,14 +309,12 @@ public class PartitionBalancer { * @param myWorkerInfo Worker info * @param masterSetPartitionOwners Master set partition owners, received * prior to beginning the superstep - * @param partitionStore Partition store for the given worker * @return Information for the partition exchange. */ public static PartitionExchange updatePartitionOwners( List<PartitionOwner> partitionOwnerList, WorkerInfo myWorkerInfo, - Collection<? extends PartitionOwner> masterSetPartitionOwners, - PartitionStore partitionStore) { + Collection<? extends PartitionOwner> masterSetPartitionOwners) { partitionOwnerList.clear(); partitionOwnerList.addAll(masterSetPartitionOwners); http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/partition/PartitionOwner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionOwner.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionOwner.java index 0ac74da..f303a09 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionOwner.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionOwner.java @@ -69,22 +69,6 @@ public interface PartitionOwner extends Writable { void setPreviousWorkerInfo(WorkerInfo workerInfo); /** - * If this is a restarted checkpoint, the worker will use this information - * to determine where the checkpointed partition was stored on HDFS. - * - * @return Prefix of the checkpoint HDFS files for this partition, null if - * this is not a restarted superstep. - */ - String getCheckpointFilesPrefix(); - - /** - * Set the checkpoint files prefix. Master uses this. - * - * @param checkpointFilesPrefix HDFS checkpoint file prefix - */ - void setCheckpointFilesPrefix(String checkpointFilesPrefix); - - /** * Write to the output, but don't serialize the whole WorkerInfo, * instead use just the task id * http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java index f128f34..7d4c1cb 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import com.google.common.collect.Lists; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.worker.WorkerInfo; import org.apache.hadoop.io.Writable; @@ -74,6 +75,11 @@ public abstract class SimpleMasterPartitioner<I extends WritableComparable, } @Override + public void setPartitionOwners(Collection<PartitionOwner> partitionOwners) { + partitionOwnerList = Lists.newArrayList(partitionOwners); + } + + @Override public Collection<PartitionOwner> generateChangedPartitionOwners( Collection<PartitionStats> allPartitionStatsList, Collection<WorkerInfo> availableWorkers, http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java index 3c0de44..0ee8d92 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java @@ -71,11 +71,9 @@ public abstract class SimpleWorkerPartitioner<I extends WritableComparable, @Override public PartitionExchange updatePartitionOwners(WorkerInfo myWorkerInfo, - Collection<? extends PartitionOwner> masterSetPartitionOwners, - PartitionStore<I, V, E> partitionStore) { + Collection<? extends PartitionOwner> masterSetPartitionOwners) { PartitionExchange exchange = PartitionBalancer.updatePartitionOwners( - partitionOwnerList, myWorkerInfo, masterSetPartitionOwners, - partitionStore); + partitionOwnerList, myWorkerInfo, masterSetPartitionOwners); extractAvailableWorkers(); return exchange; } http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/partition/WorkerGraphPartitioner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/WorkerGraphPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/WorkerGraphPartitioner.java index 004ea81..211fedb 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/WorkerGraphPartitioner.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/WorkerGraphPartitioner.java @@ -75,14 +75,11 @@ public interface WorkerGraphPartitioner<I extends WritableComparable, * @param myWorkerInfo Worker info. * @param masterSetPartitionOwners Master set partition owners, received * prior to beginning the superstep - * @param partitionStore Partition store for this worker - * (can be used to fill the return map of partitions to send) * @return Information for the partition exchange. */ PartitionExchange updatePartitionOwners( WorkerInfo myWorkerInfo, - Collection<? extends PartitionOwner> masterSetPartitionOwners, - PartitionStore<I, V, E> partitionStore); + Collection<? extends PartitionOwner> masterSetPartitionOwners); /** * Get a collection of the {@link PartitionOwner} objects. http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java index 2c4606f..bb2865c 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java @@ -142,81 +142,107 @@ public class InternalVertexRunner { GiraphConfiguration conf, String[] vertexInputData, String[] edgeInputData) throws Exception { - File tmpDir = null; + // Prepare input file, output folder and temporary folders + File tmpDir = FileUtils.createTestDir(conf.getComputationName()); try { - // Prepare input file, output folder and temporary folders - tmpDir = FileUtils.createTestDir(conf.getComputationName()); - - File vertexInputFile = null; - File edgeInputFile = null; - if (conf.hasVertexInputFormat()) { - vertexInputFile = FileUtils.createTempFile(tmpDir, "vertices.txt"); - } - if (conf.hasEdgeInputFormat()) { - edgeInputFile = FileUtils.createTempFile(tmpDir, "edges.txt"); - } + return run(conf, vertexInputData, edgeInputData, null, tmpDir); + } finally { + FileUtils.delete(tmpDir); + } + } - File outputDir = FileUtils.createTempDir(tmpDir, "output"); - File zkDir = FileUtils.createTempDir(tmpDir, "_bspZooKeeper"); - File zkMgrDir = FileUtils.createTempDir(tmpDir, "_defaultZkManagerDir"); - File checkpointsDir = FileUtils.createTempDir(tmpDir, "_checkpoints"); + /** + * Attempts to run the vertex internally in the current JVM, reading from and + * writing to a temporary folder on local disk. Will start its own zookeeper + * instance. + * + * + * @param conf GiraphClasses specifying which types to use + * @param vertexInputData linewise vertex input data + * @param edgeInputData linewise edge input data + * @param checkpointsDir if set, will use this folder + * for storing checkpoints. + * @param tmpDir file path for storing temporary files. + * @return linewise output data, or null if job fails + * @throws Exception if anything goes wrong + */ + public static Iterable<String> run( + GiraphConfiguration conf, + String[] vertexInputData, + String[] edgeInputData, + String checkpointsDir, + File tmpDir) throws Exception { + File vertexInputFile = null; + File edgeInputFile = null; + if (conf.hasVertexInputFormat()) { + vertexInputFile = FileUtils.createTempFile(tmpDir, "vertices.txt"); + } + if (conf.hasEdgeInputFormat()) { + edgeInputFile = FileUtils.createTempFile(tmpDir, "edges.txt"); + } - // Write input data to disk - if (conf.hasVertexInputFormat()) { - FileUtils.writeLines(vertexInputFile, vertexInputData); - } - if (conf.hasEdgeInputFormat()) { - FileUtils.writeLines(edgeInputFile, edgeInputData); - } + File outputDir = FileUtils.createTempDir(tmpDir, "output"); + File zkDir = FileUtils.createTempDir(tmpDir, "_bspZooKeeper"); + File zkMgrDir = FileUtils.createTempDir(tmpDir, "_defaultZkManagerDir"); + // Write input data to disk + if (conf.hasVertexInputFormat()) { + FileUtils.writeLines(vertexInputFile, vertexInputData); + } + if (conf.hasEdgeInputFormat()) { + FileUtils.writeLines(edgeInputFile, edgeInputData); + } - int localZookeeperPort = findAvailablePort(); + int localZookeeperPort = findAvailablePort(); - conf.setWorkerConfiguration(1, 1, 100.0f); - GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false); - GiraphConstants.LOCAL_TEST_MODE.set(conf, true); - conf.setZookeeperList("localhost:" + + conf.setWorkerConfiguration(1, 1, 100.0f); + GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false); + GiraphConstants.LOCAL_TEST_MODE.set(conf, true); + conf.setZookeeperList("localhost:" + String.valueOf(localZookeeperPort)); - conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.toString()); - GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf, - zkMgrDir.toString()); - GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir.toString()); + conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.toString()); + GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf, + zkMgrDir.toString()); - // Create and configure the job to run the vertex - GiraphJob job = new GiraphJob(conf, conf.getComputationName()); + if (checkpointsDir == null) { + checkpointsDir = FileUtils.createTempDir( + tmpDir, "_checkpoints").toString(); + } + GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir); - Job internalJob = job.getInternalJob(); - if (conf.hasVertexInputFormat()) { - GiraphFileInputFormat.setVertexInputPath(internalJob.getConfiguration(), - new Path(vertexInputFile.toString())); - } - if (conf.hasEdgeInputFormat()) { - GiraphFileInputFormat.setEdgeInputPath(internalJob.getConfiguration(), - new Path(edgeInputFile.toString())); - } - FileOutputFormat.setOutputPath(job.getInternalJob(), - new Path(outputDir.toString())); + // Create and configure the job to run the vertex + GiraphJob job = new GiraphJob(conf, conf.getComputationName()); - // Configure a local zookeeper instance - Properties zkProperties = configLocalZooKeeper(zkDir, localZookeeperPort); + Job internalJob = job.getInternalJob(); + if (conf.hasVertexInputFormat()) { + GiraphFileInputFormat.setVertexInputPath(internalJob.getConfiguration(), + new Path(vertexInputFile.toString())); + } + if (conf.hasEdgeInputFormat()) { + GiraphFileInputFormat.setEdgeInputPath(internalJob.getConfiguration(), + new Path(edgeInputFile.toString())); + } + FileOutputFormat.setOutputPath(job.getInternalJob(), + new Path(outputDir.toString())); - QuorumPeerConfig qpConfig = new QuorumPeerConfig(); - qpConfig.parseProperties(zkProperties); + // Configure a local zookeeper instance + Properties zkProperties = configLocalZooKeeper(zkDir, localZookeeperPort); - boolean success = runZooKeeperAndJob(qpConfig, job); - if (!success) { - return null; - } + QuorumPeerConfig qpConfig = new QuorumPeerConfig(); + qpConfig.parseProperties(zkProperties); - File outFile = new File(outputDir, "part-m-00000"); - if (conf.hasVertexOutputFormat() && outFile.canRead()) { - return Files.readLines(outFile, Charsets.UTF_8); - } else { - return ImmutableList.of(); - } - } finally { - FileUtils.delete(tmpDir); + boolean success = runZooKeeperAndJob(qpConfig, job); + if (!success) { + return null; } + + File outFile = new File(outputDir, "part-m-00000"); + if (conf.hasVertexOutputFormat() && outFile.canRead()) { + return Files.readLines(outFile, Charsets.UTF_8); + } else { + return ImmutableList.of(); + } + } /** @@ -236,42 +262,97 @@ public class InternalVertexRunner { E extends Writable> void run( GiraphConfiguration conf, TestGraph<I, V, E> graph) throws Exception { - File tmpDir = null; + // Prepare temporary folders + File tmpDir = FileUtils.createTestDir(conf.getComputationName()); try { - // Prepare temporary folders - tmpDir = FileUtils.createTestDir(conf.getComputationName()); + run(conf, graph, tmpDir, null); + } finally { + FileUtils.delete(tmpDir); + } + } - File zkDir = FileUtils.createTempDir(tmpDir, "_bspZooKeeper"); - File zkMgrDir = FileUtils.createTempDir(tmpDir, "_defaultZkManagerDir"); - File checkpointsDir = FileUtils.createTempDir(tmpDir, "_checkpoints"); + /** + * Attempts to run the vertex internally in the current JVM, + * reading from an in-memory graph. Will start its own zookeeper + * instance. + * + * @param <I> Vertex ID + * @param <V> Vertex Value + * @param <E> Edge Value + * @param conf GiraphClasses specifying which types to use + * @param graph input graph + * @param tmpDir file path for storing temporary files. + * @param checkpointsDir if set, will use this folder + * for storing checkpoints. + * @throws Exception if anything goes wrong + */ + public static <I extends WritableComparable, + V extends Writable, + E extends Writable> void run( + GiraphConfiguration conf, + TestGraph<I, V, E> graph, + File tmpDir, + String checkpointsDir) throws Exception { + File zkDir = FileUtils.createTempDir(tmpDir, "_bspZooKeeper"); + File zkMgrDir = FileUtils.createTempDir(tmpDir, "_defaultZkManagerDir"); + + if (checkpointsDir == null) { + checkpointsDir = FileUtils. + createTempDir(tmpDir, "_checkpoints").toString(); + } - conf.setVertexInputFormatClass(InMemoryVertexInputFormat.class); + conf.setVertexInputFormatClass(InMemoryVertexInputFormat.class); - // Create and configure the job to run the vertex - GiraphJob job = new GiraphJob(conf, conf.getComputationName()); + // Create and configure the job to run the vertex + GiraphJob job = new GiraphJob(conf, conf.getComputationName()); - InMemoryVertexInputFormat.setGraph(graph); + InMemoryVertexInputFormat.setGraph(graph); - int localZookeeperPort = findAvailablePort(); + int localZookeeperPort = findAvailablePort(); - conf.setWorkerConfiguration(1, 1, 100.0f); - GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false); - GiraphConstants.LOCAL_TEST_MODE.set(conf, true); - GiraphConstants.ZOOKEEPER_LIST.set(conf, "localhost:" + + conf.setWorkerConfiguration(1, 1, 100.0f); + GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false); + GiraphConstants.LOCAL_TEST_MODE.set(conf, true); + GiraphConstants.ZOOKEEPER_LIST.set(conf, "localhost:" + String.valueOf(localZookeeperPort)); - conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.toString()); - GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf, - zkMgrDir.toString()); - GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir.toString()); + conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.toString()); + GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf, + zkMgrDir.toString()); + GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir); + + // Configure a local zookeeper instance + Properties zkProperties = configLocalZooKeeper(zkDir, localZookeeperPort); + + QuorumPeerConfig qpConfig = new QuorumPeerConfig(); + qpConfig.parseProperties(zkProperties); - // Configure a local zookeeper instance - Properties zkProperties = configLocalZooKeeper(zkDir, localZookeeperPort); + runZooKeeperAndJob(qpConfig, job); - QuorumPeerConfig qpConfig = new QuorumPeerConfig(); - qpConfig.parseProperties(zkProperties); + } - runZooKeeperAndJob(qpConfig, job); + /** + * Attempts to run the vertex internally in the current JVM, reading and + * writing to an in-memory graph. Will start its own zookeeper + * instance. + * + * @param <I> Vertex ID + * @param <V> Vertex Value + * @param <E> Edge Value + * @param conf GiraphClasses specifying which types to use + * @param graph input graph + * @return Output graph + * @throws Exception if anything goes wrong + */ + public static <I extends WritableComparable, + V extends Writable, + E extends Writable> TestGraph<I, V, E> runWithInMemoryOutput( + GiraphConfiguration conf, + TestGraph<I, V, E> graph) throws Exception { + // Prepare temporary folders + File tmpDir = FileUtils.createTestDir(conf.getComputationName()); + try { + return runWithInMemoryOutput(conf, graph, tmpDir, null); } finally { FileUtils.delete(tmpDir); } @@ -287,6 +368,9 @@ public class InternalVertexRunner { * @param <E> Edge Value * @param conf GiraphClasses specifying which types to use * @param graph input graph + * @param tmpDir file path for storing temporary files. + * @param checkpointsDir if set, will use this folder + * for storing checkpoints. * @return Output graph * @throws Exception if anything goes wrong */ @@ -294,10 +378,12 @@ public class InternalVertexRunner { V extends Writable, E extends Writable> TestGraph<I, V, E> runWithInMemoryOutput( GiraphConfiguration conf, - TestGraph<I, V, E> graph) throws Exception { + TestGraph<I, V, E> graph, + File tmpDir, + String checkpointsDir) throws Exception { conf.setVertexOutputFormatClass(InMemoryVertexOutputFormat.class); InMemoryVertexOutputFormat.initializeOutputGraph(conf); - InternalVertexRunner.run(conf, graph); + InternalVertexRunner.run(conf, graph, tmpDir, checkpointsDir); return InMemoryVertexOutputFormat.getOutputGraph(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/utils/io/ExtendedDataInputOutput.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/io/ExtendedDataInputOutput.java b/giraph-core/src/main/java/org/apache/giraph/utils/io/ExtendedDataInputOutput.java index af45426..a5b1567 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/io/ExtendedDataInputOutput.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/io/ExtendedDataInputOutput.java @@ -65,6 +65,6 @@ public class ExtendedDataInputOutput extends DataInputOutput { @Override public void readFields(DataInput in) throws IOException { - WritableUtils.readExtendedDataOutput(in, conf); + dataOutput = WritableUtils.readExtendedDataOutput(in, conf); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index de7af28..0d90a59 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -70,10 +70,13 @@ import org.apache.giraph.utils.ProgressableUtils; import org.apache.giraph.utils.WritableUtils; import org.apache.giraph.zk.BspEvent; import org.apache.giraph.zk.PredicateLock; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.log4j.Level; @@ -92,9 +95,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import net.iharder.Base64; -import java.io.ByteArrayOutputStream; import java.io.DataInputStream; -import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; import java.nio.charset.Charset; @@ -110,6 +111,7 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; /** @@ -566,7 +568,7 @@ public class BspServiceWorker<I extends WritableComparable, Collection<? extends PartitionOwner> masterSetPartitionOwners = startSuperstep(); workerGraphPartitioner.updatePartitionOwners( - getWorkerInfo(), masterSetPartitionOwners, getPartitionStore()); + getWorkerInfo(), masterSetPartitionOwners); /*if[HADOOP_NON_SECURE] workerClient.setup(); @@ -1370,73 +1372,42 @@ public class BspServiceWorker<I extends WritableComparable, // Algorithm: // For each partition, dump vertices and messages Path metadataFilePath = - new Path(getCheckpointBasePath(getSuperstep()) + "." + - getHostnamePartitionId() + - CHECKPOINT_METADATA_POSTFIX); - Path verticesFilePath = - new Path(getCheckpointBasePath(getSuperstep()) + "." + - getHostnamePartitionId() + - CHECKPOINT_VERTICES_POSTFIX); + createCheckpointFilePathSafe(CHECKPOINT_METADATA_POSTFIX); Path validFilePath = - new Path(getCheckpointBasePath(getSuperstep()) + "." + - getHostnamePartitionId() + - CHECKPOINT_VALID_POSTFIX); + createCheckpointFilePathSafe(CHECKPOINT_VALID_POSTFIX); + Path checkpointFilePath = + createCheckpointFilePathSafe(CHECKPOINT_DATA_POSTFIX); - // Remove these files if they already exist (shouldn't though, unless - // of previous failure of this worker) - if (getFs().delete(validFilePath, false)) { - LOG.warn("storeCheckpoint: Removed valid file " + - validFilePath); - } - if (getFs().delete(metadataFilePath, false)) { - LOG.warn("storeCheckpoint: Removed metadata file " + - metadataFilePath); - } - if (getFs().delete(verticesFilePath, false)) { - LOG.warn("storeCheckpoint: Removed file " + verticesFilePath); - } - FSDataOutputStream verticesOutputStream = - getFs().create(verticesFilePath); - ByteArrayOutputStream metadataByteStream = new ByteArrayOutputStream(); - DataOutput metadataOutput = new DataOutputStream(metadataByteStream); - for (Integer partitionId : getPartitionStore().getPartitionIds()) { - Partition<I, V, E> partition = - getPartitionStore().getOrCreatePartition(partitionId); - long startPos = verticesOutputStream.getPos(); - partition.write(verticesOutputStream); - // write messages - getServerData().getCurrentMessageStore().writePartition( - verticesOutputStream, partition.getId()); - // Write the metadata for this partition - // Format: - // <index count> - // <index 0 start pos><partition id> - // <index 1 start pos><partition id> - metadataOutput.writeLong(startPos); - metadataOutput.writeInt(partition.getId()); - if (LOG.isDebugEnabled()) { - LOG.debug("storeCheckpoint: Vertex file starting " + - "offset = " + startPos + ", length = " + - (verticesOutputStream.getPos() - startPos) + - ", partition = " + partition.toString()); - } - getPartitionStore().putPartition(partition); - getContext().progress(); - } // Metadata is buffered and written at the end since it's small and // needs to know how many partitions this worker owns FSDataOutputStream metadataOutputStream = getFs().create(metadataFilePath); metadataOutputStream.writeInt(getPartitionStore().getNumPartitions()); - metadataOutputStream.write(metadataByteStream.toByteArray()); + + for (Integer partitionId : getPartitionStore().getPartitionIds()) { + metadataOutputStream.writeInt(partitionId); + } metadataOutputStream.close(); - verticesOutputStream.close(); - if (LOG.isInfoEnabled()) { - LOG.info("storeCheckpoint: Finished metadata (" + - metadataFilePath + ") and vertices (" + verticesFilePath + ")."); + + storeCheckpointVertices(); + + FSDataOutputStream checkpointOutputStream = + getFs().create(checkpointFilePath); + workerContext.write(checkpointOutputStream); + getContext().progress(); + + for (Integer partitionId : getPartitionStore().getPartitionIds()) { + // write messages + checkpointOutputStream.writeInt(partitionId); + getServerData().getCurrentMessageStore().writePartition( + checkpointOutputStream, partitionId); + getContext().progress(); + } + checkpointOutputStream.close(); + getFs().createNewFile(validFilePath); // Notify master that checkpoint is stored @@ -1462,116 +1433,247 @@ public class BspServiceWorker<I extends WritableComparable, } } + /** + * Create checkpoint file safely. If file already exists remove it first. + * @param name file extension + * @return full file path to newly created file + * @throws IOException + */ + private Path createCheckpointFilePathSafe(String name) throws IOException { + Path validFilePath = new Path(getCheckpointBasePath(getSuperstep()) + "." + + getTaskPartition() + name); + // Remove these files if they already exist (shouldn't though, unless + // of previous failure of this worker) + if (getFs().delete(validFilePath, false)) { + LOG.warn("storeCheckpoint: Removed " + name + " file " + + validFilePath); + } + return validFilePath; + } + + /** + * Returns path to saved checkpoint. + * Doesn't check if file actually exists. + * @param superstep saved superstep. + * @param name extension name + * @return fill file path to checkpoint file + */ + private Path getSavedCheckpoint(long superstep, String name) { + return new Path(getSavedCheckpointBasePath(superstep) + "." + + getTaskPartition() + name); + } + + /** + * Save partitions. To speed up this operation + * runs in multiple threads. + */ + private void storeCheckpointVertices() { + final int numPartitions = getPartitionStore().getNumPartitions(); + int numThreads = Math.min( + GiraphConstants.NUM_CHECKPOINT_IO_THREADS.get(getConfiguration()), + numPartitions); + + final Queue<Integer> partitionIdQueue = + (numPartitions == 0) ? new LinkedList<Integer>() : + new ArrayBlockingQueue<Integer>(numPartitions); + Iterables.addAll(partitionIdQueue, getPartitionStore().getPartitionIds()); + + final CompressionCodec codec = + new CompressionCodecFactory(getConfiguration()) + .getCodecByClassName( + GiraphConstants.CHECKPOINT_COMPRESSION_CODEC + .get(getConfiguration())); + + long t0 = System.currentTimeMillis(); + + CallableFactory<Void> callableFactory = new CallableFactory<Void>() { + @Override + public Callable<Void> newCallable(int callableId) { + return new Callable<Void>() { + + @Override + public Void call() throws Exception { + while (!partitionIdQueue.isEmpty()) { + Integer partitionId = partitionIdQueue.poll(); + if (partitionId == null) { + break; + } + Path path = + createCheckpointFilePathSafe("_" + partitionId + + CHECKPOINT_VERTICES_POSTFIX); + + FSDataOutputStream uncompressedStream = + getFs().create(path); + + + DataOutputStream stream = codec == null ? uncompressedStream : + new DataOutputStream( + codec.createOutputStream(uncompressedStream)); + + Partition<I, V, E> partition = + getPartitionStore().getOrCreatePartition(partitionId); + + partition.write(stream); + + getPartitionStore().putPartition(partition); + + stream.close(); + uncompressedStream.close(); + } + return null; + } + + + }; + } + }; + + ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads, + "checkpoint-vertices-%d", getContext()); + + LOG.info("Save checkpoint in " + (System.currentTimeMillis() - t0) + + " ms, using " + numThreads + " threads"); + } + + /** + * Load saved partitions in multiple threads. + * @param superstep superstep to load + * @param partitions list of partitions to load + */ + private void loadCheckpointVertices(final long superstep, + List<Integer> partitions) { + int numThreads = Math.min( + GiraphConstants.NUM_CHECKPOINT_IO_THREADS.get(getConfiguration()), + partitions.size()); + + final Queue<Integer> partitionIdQueue = + new ConcurrentLinkedQueue<>(partitions); + + final CompressionCodec codec = + new CompressionCodecFactory(getConfiguration()) + .getCodecByClassName( + GiraphConstants.CHECKPOINT_COMPRESSION_CODEC + .get(getConfiguration())); + + long t0 = System.currentTimeMillis(); + + CallableFactory<Void> callableFactory = new CallableFactory<Void>() { + @Override + public Callable<Void> newCallable(int callableId) { + return new Callable<Void>() { + + @Override + public Void call() throws Exception { + while (!partitionIdQueue.isEmpty()) { + Integer partitionId = partitionIdQueue.poll(); + if (partitionId == null) { + break; + } + Path path = + getSavedCheckpoint(superstep, "_" + partitionId + + CHECKPOINT_VERTICES_POSTFIX); + + FSDataInputStream compressedStream = + getFs().open(path); + + DataInputStream stream = codec == null ? compressedStream : + new DataInputStream( + codec.createInputStream(compressedStream)); + + Partition<I, V, E> partition = + getConfiguration().createPartition(partitionId, getContext()); + + partition.readFields(stream); + + getPartitionStore().addPartition(partition); + + stream.close(); + } + return null; + } + + }; + } + }; + + ProgressableUtils.getResultsWithNCallables(callableFactory, numThreads, + "load-vertices-%d", getContext()); + + LOG.info("Loaded checkpoint in " + (System.currentTimeMillis() - t0) + + " ms, using " + numThreads + " threads"); + } + @Override public VertexEdgeCount loadCheckpoint(long superstep) { - try { - // clear old message stores - getServerData().getIncomingMessageStore().clearAll(); - getServerData().getCurrentMessageStore().clearAll(); - } catch (IOException e) { - throw new RuntimeException( - "loadCheckpoint: Failed to clear message stores ", e); - } + Path metadataFilePath = + getSavedCheckpoint(superstep, CHECKPOINT_METADATA_POSTFIX); + Path checkpointFilePath = + getSavedCheckpoint(superstep, CHECKPOINT_DATA_POSTFIX); // Algorithm: // Examine all the partition owners and load the ones // that match my hostname and id from the master designated checkpoint // prefixes. - long startPos = 0; - int loadedPartitions = 0; - for (PartitionOwner partitionOwner : - workerGraphPartitioner.getPartitionOwners()) { - if (partitionOwner.getWorkerInfo().equals(getWorkerInfo())) { - String metadataFile = - partitionOwner.getCheckpointFilesPrefix() + - CHECKPOINT_METADATA_POSTFIX; - String partitionsFile = - partitionOwner.getCheckpointFilesPrefix() + - CHECKPOINT_VERTICES_POSTFIX; - try { - int partitionId = -1; - DataInputStream metadataStream = - getFs().open(new Path(metadataFile)); - int partitions = metadataStream.readInt(); - for (int i = 0; i < partitions; ++i) { - startPos = metadataStream.readLong(); - partitionId = metadataStream.readInt(); - if (partitionId == partitionOwner.getPartitionId()) { - break; - } - } - if (partitionId != partitionOwner.getPartitionId()) { - throw new IllegalStateException( - "loadCheckpoint: " + partitionOwner + - " not found!"); - } - metadataStream.close(); - Partition<I, V, E> partition = - getConfiguration().createPartition(partitionId, getContext()); - DataInputStream partitionsStream = - getFs().open(new Path(partitionsFile)); - if (partitionsStream.skip(startPos) != startPos) { - throw new IllegalStateException( - "loadCheckpoint: Failed to skip " + startPos + - " on " + partitionsFile); - } - partition.readFields(partitionsStream); - getServerData().getIncomingMessageStore().readFieldsForPartition( - partitionsStream, partitionId); - partitionsStream.close(); - if (LOG.isInfoEnabled()) { - LOG.info("loadCheckpoint: Loaded partition " + - partition); - } - if (getPartitionStore().hasPartition(partitionId)) { - throw new IllegalStateException( - "loadCheckpoint: Already has partition owner " + - partitionOwner); - } - getPartitionStore().addPartition(partition); - getContext().progress(); - ++loadedPartitions; - } catch (IOException e) { - throw new RuntimeException( - "loadCheckpoint: Failed to get partition owner " + - partitionOwner, e); - } + try { + DataInputStream metadataStream = + getFs().open(metadataFilePath); + + int partitions = metadataStream.readInt(); + List<Integer> partitionIds = new ArrayList<>(partitions); + for (int i = 0; i < partitions; i++) { + int partitionId = metadataStream.readInt(); + partitionIds.add(partitionId); } - } - if (LOG.isInfoEnabled()) { - LOG.info("loadCheckpoint: Loaded " + loadedPartitions + - " partitions of out " + - workerGraphPartitioner.getPartitionOwners().size() + - " total."); - } - // Load global stats and superstep classes - GlobalStats globalStats = new GlobalStats(); - SuperstepClasses superstepClasses = new SuperstepClasses(); - String finalizedCheckpointPath = - getCheckpointBasePath(superstep) + CHECKPOINT_FINALIZED_POSTFIX; - try { + loadCheckpointVertices(superstep, partitionIds); + + getContext().progress(); + + metadataStream.close(); + + DataInputStream checkpointStream = + getFs().open(checkpointFilePath); + workerContext.readFields(checkpointStream); + + // Load global stats and superstep classes + GlobalStats globalStats = new GlobalStats(); + SuperstepClasses superstepClasses = new SuperstepClasses(); + String finalizedCheckpointPath = + getSavedCheckpointBasePath(superstep) + CHECKPOINT_FINALIZED_POSTFIX; DataInputStream finalizedStream = getFs().open(new Path(finalizedCheckpointPath)); globalStats.readFields(finalizedStream); superstepClasses.readFields(finalizedStream); getConfiguration().updateSuperstepClasses(superstepClasses); - } catch (IOException e) { - throw new IllegalStateException( - "loadCheckpoint: Failed to load global stats and superstep classes", - e); - } + getServerData().resetMessageStores(); - getServerData().prepareSuperstep(); - // Communication service needs to setup the connections prior to - // processing vertices + for (int i = 0; i < partitions; i++) { + int partitionId = checkpointStream.readInt(); + getServerData().getCurrentMessageStore().readFieldsForPartition( + checkpointStream, partitionId); + } + checkpointStream.close(); + + if (LOG.isInfoEnabled()) { + LOG.info("loadCheckpoint: Loaded " + + workerGraphPartitioner.getPartitionOwners().size() + + " total."); + } + + // Communication service needs to setup the connections prior to + // processing vertices /*if[HADOOP_NON_SECURE] workerClient.setup(); else[HADOOP_NON_SECURE]*/ - workerClient.setup(getConfiguration().authenticate()); + workerClient.setup(getConfiguration().authenticate()); /*end[HADOOP_NON_SECURE]*/ - return new VertexEdgeCount(globalStats.getVertexCount(), - globalStats.getEdgeCount()); + return new VertexEdgeCount(globalStats.getVertexCount(), + globalStats.getEdgeCount()); + + } catch (IOException e) { + throw new RuntimeException( + "loadCheckpoint: Failed for superstep=" + superstep, e); + } } /** @@ -1651,7 +1753,7 @@ else[HADOOP_NON_SECURE]*/ // 5. Add the partitions to myself. PartitionExchange partitionExchange = workerGraphPartitioner.updatePartitionOwners( - getWorkerInfo(), masterSetPartitionOwners, getPartitionStore()); + getWorkerInfo(), masterSetPartitionOwners); workerClient.openConnections(); Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap = http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java index 29835c5..aca9944 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java @@ -25,6 +25,9 @@ import org.apache.giraph.graph.GraphState; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Mapper; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; import java.util.List; /** @@ -33,9 +36,8 @@ import java.util.List; */ @SuppressWarnings("rawtypes") public abstract class WorkerContext - extends DefaultImmutableClassesGiraphConfigurable - implements WorkerAggregatorUsage { - + extends DefaultImmutableClassesGiraphConfigurable + implements WorkerAggregatorUsage, Writable { /** Global graph state */ private GraphState graphState; /** Worker aggregator usage */ @@ -203,4 +205,12 @@ public abstract class WorkerContext public <A extends Writable> A getAggregatedValue(String name) { return workerAggregatorUsage.<A>getAggregatedValue(name); } + + @Override + public void write(DataOutput dataOutput) throws IOException { + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java b/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java index 96bd5d7..57bebbd 100644 --- a/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java +++ b/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java @@ -66,7 +66,7 @@ public class SimpleRangePartitionFactoryTest { WorkerGraphPartitioner<LongWritable, Writable, Writable> workerPartitioner = factory.createWorkerGraphPartitioner(); - workerPartitioner.updatePartitionOwners(null, owners, null); + workerPartitioner.updatePartitionOwners(null, owners); LongWritable longWritable = new LongWritable(); int[] partitions = new int[keySpaceSize]; http://git-wip-us.apache.org/repos/asf/giraph/blob/02d9e6c2/giraph-examples/src/test/java/org/apache/giraph/TestCheckpointing.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestCheckpointing.java b/giraph-examples/src/test/java/org/apache/giraph/TestCheckpointing.java new file mode 100644 index 0000000..387b937 --- /dev/null +++ b/giraph-examples/src/test/java/org/apache/giraph/TestCheckpointing.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph; + +import org.apache.giraph.aggregators.LongSumAggregator; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.edge.EdgeFactory; +import org.apache.giraph.examples.SimpleSuperstepComputation; +import org.apache.giraph.graph.BasicComputation; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.job.GiraphJob; +import org.apache.giraph.master.DefaultMasterCompute; +import org.apache.giraph.worker.DefaultWorkerContext; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.log4j.Logger; +import org.junit.Test; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests that worker context and master computation + * are properly saved and loaded back at checkpoint. + */ +public class TestCheckpointing extends BspCase { + + /** Class logger */ + private static final Logger LOG = + Logger.getLogger(TestCheckpointing.class); + /** ID to be used with test job */ + public static final String TEST_JOB_ID = "test_job"; + /** + * Compute will double check that we don't run supersteps + * lesser than specified by this key. That way we ensure that + * computation actually restarted and not recalculated from the + * beginning. + */ + public static final String KEY_MIN_SUPERSTEP = "minimum.superstep"; + + /** + * Create the test case + */ + public TestCheckpointing() { + super(TestCheckpointing.class.getName()); + } + + + @Test + public void testBspCheckpoint() + throws IOException, InterruptedException, ClassNotFoundException { + Path checkpointsDir = getTempPath("checkpointing"); + Path outputPath = getTempPath(getCallingMethodName()); + GiraphConfiguration conf = new GiraphConfiguration(); + conf.setComputationClass( + CheckpointComputation.class); + conf.setWorkerContextClass( + CheckpointVertexWorkerContext.class); + conf.setMasterComputeClass( + CheckpointVertexMasterCompute.class); + conf.setVertexInputFormatClass(SimpleSuperstepComputation.SimpleSuperstepVertexInputFormat.class); + conf.setVertexOutputFormatClass(SimpleSuperstepComputation.SimpleSuperstepVertexOutputFormat.class); + conf.set("mapred.job.id", TEST_JOB_ID); + conf.set(KEY_MIN_SUPERSTEP, "0"); + GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath); + + GiraphConfiguration configuration = job.getConfiguration(); + GiraphConstants.CHECKPOINT_DIRECTORY.set(configuration, checkpointsDir.toString()); + GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.set(configuration, false); + configuration.setCheckpointFrequency(2); + + assertTrue(job.run(true)); + + long idSum = 0; + if (!runningInDistributedMode()) { + FileStatus fileStatus = getSinglePartFileStatus(job.getConfiguration(), + outputPath); + idSum = CheckpointVertexWorkerContext + .getFinalSum(); + LOG.info("testBspCheckpoint: idSum = " + idSum + + " fileLen = " + fileStatus.getLen()); + } + + // Restart the test from superstep 2 + LOG.info("testBspCheckpoint: Restarting from superstep 2" + + " with checkpoint path = " + checkpointsDir); + outputPath = getTempPath("checkpointing_restarted"); + + GiraphConstants.RESTART_JOB_ID.set(conf, TEST_JOB_ID); + conf.set("mapred.job.id", "restarted_test_job"); + conf.set(GiraphConstants.RESTART_SUPERSTEP, "2"); + conf.set(KEY_MIN_SUPERSTEP, "2"); + + GiraphJob restartedJob = prepareJob(getCallingMethodName() + "Restarted", + conf, outputPath); + + GiraphConstants.CHECKPOINT_DIRECTORY.set(restartedJob.getConfiguration(), + checkpointsDir.toString()); + + assertTrue(restartedJob.run(true)); + if (!runningInDistributedMode()) { + long idSumRestarted = + CheckpointVertexWorkerContext + .getFinalSum(); + LOG.info("testBspCheckpoint: idSumRestarted = " + + idSumRestarted); + assertEquals(idSum, idSumRestarted); + } + } + + + /** + * Actual computation. + */ + public static class CheckpointComputation extends + BasicComputation<LongWritable, IntWritable, FloatWritable, + FloatWritable> { + @Override + public void compute( + Vertex<LongWritable, IntWritable, FloatWritable> vertex, + Iterable<FloatWritable> messages) throws IOException { + CheckpointVertexWorkerContext workerContext = getWorkerContext(); + assertEquals(getSuperstep() + 1, workerContext.testValue); + + if (getSuperstep() < getConf().getInt(KEY_MIN_SUPERSTEP, Integer.MAX_VALUE)){ + fail("Should not be running compute on superstep " + getSuperstep()); + } + + if (getSuperstep() > 4) { + vertex.voteToHalt(); + return; + } + + aggregate(LongSumAggregator.class.getName(), + new LongWritable(vertex.getId().get())); + + float msgValue = 0.0f; + for (FloatWritable message : messages) { + float curMsgValue = message.get(); + msgValue += curMsgValue; + } + + int vertexValue = vertex.getValue().get(); + vertex.setValue(new IntWritable(vertexValue + (int) msgValue)); + for (Edge<LongWritable, FloatWritable> edge : vertex.getEdges()) { + FloatWritable newEdgeValue = new FloatWritable(edge.getValue().get() + + (float) vertexValue); + Edge<LongWritable, FloatWritable> newEdge = + EdgeFactory.create(edge.getTargetVertexId(), newEdgeValue); + vertex.addEdge(newEdge); + sendMessage(edge.getTargetVertexId(), newEdgeValue); + } + } + } + + /** + * Worker context associated. + */ + public static class CheckpointVertexWorkerContext + extends DefaultWorkerContext { + /** User can access this after the application finishes if local */ + private static long FINAL_SUM; + + private int testValue; + + public static long getFinalSum() { + return FINAL_SUM; + } + + @Override + public void postApplication() { + setFinalSum(this.<LongWritable>getAggregatedValue( + LongSumAggregator.class.getName()).get()); + LOG.info("FINAL_SUM=" + FINAL_SUM); + } + + /** + * Set the final sum + * + * @param value sum + */ + private static void setFinalSum(long value) { + FINAL_SUM = value; + } + + @Override + public void preSuperstep() { + assertEquals(getSuperstep(), testValue++); + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + super.readFields(dataInput); + testValue = dataInput.readInt(); + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + super.write(dataOutput); + dataOutput.writeInt(testValue); + } + } + + /** + * Master compute + */ + public static class CheckpointVertexMasterCompute extends + DefaultMasterCompute { + + private int testValue = 0; + + @Override + public void compute() { + long superstep = getSuperstep(); + assertEquals(superstep, testValue++); + } + + @Override + public void initialize() throws InstantiationException, + IllegalAccessException { + registerAggregator(LongSumAggregator.class.getName(), + LongSumAggregator.class); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + testValue = in.readInt(); + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + out.writeInt(testValue); + } + } + + + +}
