GIRAPH 514
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/2afa961b Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/2afa961b Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/2afa961b Branch: refs/heads/trunk Commit: 2afa961beebf69c9024ce928b2e5ec8b9dceda32 Parents: cbc5a26 Author: Claudio Martella <[email protected]> Authored: Fri Feb 15 17:34:04 2013 +0100 Committer: Claudio Martella <[email protected]> Committed: Fri Feb 15 17:34:04 2013 +0100 ---------------------------------------------------------------------- .../comm/messages/SequentialFileMessageStore.java | 12 ++++++------ 1 files changed, 6 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/2afa961b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java index 3698527..856aeac 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java @@ -98,7 +98,7 @@ public class SequentialFileMessageStore<I extends WritableComparable, if (LOG.isDebugEnabled()) { LOG.debug("addMessages: Deleting " + file); } - file.delete(); + //file.delete(); } file.createNewFile(); if (LOG.isDebugEnabled()) { @@ -336,7 +336,6 @@ public class SequentialFileMessageStore<I extends WritableComparable, } } - /** * Create new factory for this message store * @@ -377,7 +376,7 @@ public class SequentialFileMessageStore<I extends WritableComparable, public Factory(ImmutableClassesGiraphConfiguration config) { this.config = config; String jobId = config.get("mapred.job.id", "Unknown Job"); - + int taskId = config.getTaskPartition(); List<String> userPaths = Lists.newArrayList(config.getStrings( GiraphConstants.MESSAGES_DIRECTORY, GiraphConstants.MESSAGES_DIRECTORY_DEFAULT)); @@ -385,7 +384,8 @@ public class SequentialFileMessageStore<I extends WritableComparable, directories = new String[userPaths.size()]; int i = 0; for (String path : userPaths) { - String directory = path + jobId; + String directory = path + File.separator + jobId + File.separator + + taskId + File.separator; directories[i++] = directory; new File(directory).mkdirs(); } @@ -397,9 +397,9 @@ public class SequentialFileMessageStore<I extends WritableComparable, @Override public BasicMessageStore<I, M> newStore() { - int idx = storeCounter.getAndIncrement(); + int idx = Math.abs(storeCounter.getAndIncrement()); String fileName = - directories[idx % directories.length] + "/messages-" + idx; + directories[idx % directories.length] + "messages-" + idx; return new SequentialFileMessageStore<I, M>(config, bufferSize, fileName); }
