Repository: giraph Updated Branches: refs/heads/trunk 8675c84a8 -> 7f2d58445
GIRAPH-972 Race condition in checkpointing Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/7f2d5844 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/7f2d5844 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/7f2d5844 Branch: refs/heads/trunk Commit: 7f2d58445e2353a1a42fbb4282ed5cad724186b5 Parents: 8675c84 Author: Sergey Edunov <[email protected]> Authored: Thu Dec 18 10:05:36 2014 -0800 Committer: Sergey Edunov <[email protected]> Committed: Thu Dec 18 15:13:19 2014 -0800 ---------------------------------------------------------------------- CHANGELOG | 3 +++ .../java/org/apache/giraph/bsp/BspService.java | 21 +++++++++++++++++++ .../apache/giraph/master/BspServiceMaster.java | 22 +++++++++----------- .../apache/giraph/worker/BspServiceWorker.java | 8 +++---- 4 files changed, 38 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/7f2d5844/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 7b54584..efa2878 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,9 @@ Giraph Change Log Release 1.2.0 - unreleased + + GIRAPH-972: Race condition in checkpointing (edunov) + GIRAPH-905: Giraph Debugger (netj via edunov) GIRAPH-966: Add a way to ignore some thread exceptions (majakabiljo) http://git-wip-us.apache.org/repos/asf/giraph/blob/7f2d5844/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java index 579c772..0a5a7ba 100644 --- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java @@ -1203,4 +1203,25 @@ public abstract class BspService<I extends WritableComparable, } + /** + * For every worker this method returns unique number + * between 0 and N, where N is the total number of workers. + * This number stays the same throughout the computation. + * TaskID may be different from this number and task ID + * is not necessarily continuous + * @param workerInfo worker info object + * @return worker number + */ + protected int getWorkerId(WorkerInfo workerInfo) { + return getWorkerInfoList().indexOf(workerInfo); + } + + /** + * Returns worker info corresponding to specified worker id. + * @param id unique worker id + * @return WorkerInfo + */ + protected WorkerInfo getWorkerInfoById(int id) { + return getWorkerInfoList().get(id); + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/7f2d5844/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java index 39b4a1c..798f544 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -22,8 +22,6 @@ import static org.apache.giraph.conf.GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT; import static org.apache.giraph.conf.GiraphConstants.KEEP_ZOOKEEPER_DATA; import static org.apache.giraph.conf.GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT; import static org.apache.giraph.conf.GiraphConstants.USE_INPUT_SPLIT_LOCALITY; -import it.unimi.dsi.fastutil.ints.Int2ObjectMap; -import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; @@ -413,9 +411,14 @@ public class BspServiceMaster<I extends WritableComparable, } for (String workerInfoPath : workerInfoPathList) { WorkerInfo workerInfo = new WorkerInfo(); - WritableUtils.readFieldsFromZnode( - getZkExt(), workerInfoPath, true, null, workerInfo); - workerInfoList.add(workerInfo); + try { + WritableUtils.readFieldsFromZnode( + getZkExt(), workerInfoPath, true, null, workerInfo); + workerInfoList.add(workerInfo); + } catch (IllegalStateException e) { + LOG.warn("Can't get info from worker, did it die in between? " + + "workerInfoPath=" + workerInfoPath, e); + } } return workerInfoList; } @@ -785,11 +788,6 @@ public class BspServiceMaster<I extends WritableComparable, getConfiguration().updateSuperstepClasses(superstepClasses); int prefixFileCount = finalizedStream.readInt(); - - Int2ObjectMap<WorkerInfo> workersMap = new Int2ObjectOpenHashMap<>(); - for (WorkerInfo worker : chosenWorkerInfoList) { - workersMap.put(worker.getTaskId(), worker); - } String checkpointFile = finalizedStream.readUTF(); for (int i = 0; i < prefixFileCount; ++i) { @@ -798,7 +796,7 @@ public class BspServiceMaster<I extends WritableComparable, DataInputStream metadataStream = fs.open(new Path(checkpointFile + "." + mrTaskId + CheckpointingUtils.CHECKPOINT_METADATA_POSTFIX)); long partitions = metadataStream.readInt(); - WorkerInfo worker = workersMap.get(mrTaskId); + WorkerInfo worker = getWorkerInfoById(mrTaskId); for (long p = 0; p < partitions; ++p) { int partitionId = metadataStream.readInt(); PartitionOwner partitionOwner = new BasicPartitionOwner(partitionId, @@ -1107,7 +1105,7 @@ public class BspServiceMaster<I extends WritableComparable, finalizedOutputStream.writeInt(chosenWorkerInfoList.size()); finalizedOutputStream.writeUTF(getCheckpointBasePath(superstep)); for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) { - finalizedOutputStream.writeInt(chosenWorkerInfo.getTaskId()); + finalizedOutputStream.writeInt(getWorkerId(chosenWorkerInfo)); } globalCommHandler.write(finalizedOutputStream); aggregatorTranslation.write(finalizedOutputStream); http://git-wip-us.apache.org/repos/asf/giraph/blob/7f2d5844/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index 4ad8400..381e51a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -1462,8 +1462,8 @@ public class BspServiceWorker<I extends WritableComparable, * @throws IOException */ private Path createCheckpointFilePathSafe(String name) throws IOException { - Path validFilePath = new Path(getCheckpointBasePath(getSuperstep()) + "." + - getTaskPartition() + name); + Path validFilePath = new Path(getCheckpointBasePath(getSuperstep()) + '.' + + getWorkerId(workerInfo) + name); // Remove these files if they already exist (shouldn't though, unless // of previous failure of this worker) if (getFs().delete(validFilePath, false)) { @@ -1481,8 +1481,8 @@ public class BspServiceWorker<I extends WritableComparable, * @return fill file path to checkpoint file */ private Path getSavedCheckpoint(long superstep, String name) { - return new Path(getSavedCheckpointBasePath(superstep) + "." + - getTaskPartition() + name); + return new Path(getSavedCheckpointBasePath(superstep) + '.' + + getWorkerId(workerInfo) + name); } /**
