GIRAPH-514: DiskBackedMessageStores should take advantage of machines with multiple disks
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/a47ca0b4 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/a47ca0b4 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/a47ca0b4 Branch: refs/heads/trunk Commit: a47ca0b491489b4d28a435a5cdc3b218de0e0933 Parents: 329af80 Author: Claudio Martella <[email protected]> Authored: Thu Feb 14 16:23:08 2013 +0100 Committer: Claudio Martella <[email protected]> Committed: Thu Feb 14 16:23:08 2013 +0100 ---------------------------------------------------------------------- .../comm/messages/SequentialFileMessageStore.java | 28 ++++++++++---- .../org/apache/giraph/conf/GiraphConstants.java | 2 +- 2 files changed, 21 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/a47ca0b4/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java index 1805f0b..3698527 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java @@ -40,6 +40,7 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.SortedSet; import java.util.concurrent.atomic.AtomicInteger; @@ -361,8 +362,8 @@ public class SequentialFileMessageStore<I extends WritableComparable, implements MessageStoreFactory<I, M, BasicMessageStore<I, M>> { /** Hadoop configuration */ private final ImmutableClassesGiraphConfiguration config; - /** Directory in which we'll keep necessary files */ - private final String directory; + /** Directories in which we'll keep necessary files */ + private final String[] directories; /** Buffer size to use when reading and writing */ private final int bufferSize; /** Counter for created message stores */ @@ -376,18 +377,29 @@ public class SequentialFileMessageStore<I extends WritableComparable, public Factory(ImmutableClassesGiraphConfiguration config) { this.config = config; String jobId = config.get("mapred.job.id", "Unknown Job"); - this.directory = config.get(GiraphConstants.MESSAGES_DIRECTORY, - GiraphConstants.MESSAGES_DIRECTORY_DEFAULT) + jobId + - File.separator; - this.bufferSize = config.getInt(GiraphConstants.MESSAGES_BUFFER_SIZE, + + List<String> userPaths = Lists.newArrayList(config.getStrings( + GiraphConstants.MESSAGES_DIRECTORY, + GiraphConstants.MESSAGES_DIRECTORY_DEFAULT)); + Collections.shuffle(userPaths); + directories = new String[userPaths.size()]; + int i = 0; + for (String path : userPaths) { + String directory = path + jobId; + directories[i++] = directory; + new File(directory).mkdirs(); + } + this.bufferSize = config.getInt( + GiraphConstants.MESSAGES_BUFFER_SIZE, GiraphConstants.MESSAGES_BUFFER_SIZE_DEFAULT); storeCounter = new AtomicInteger(); - new File(directory).mkdirs(); } @Override public BasicMessageStore<I, M> newStore() { - String fileName = directory + storeCounter.getAndIncrement(); + int idx = storeCounter.getAndIncrement(); + String fileName = + directories[idx % directories.length] + "/messages-" + idx; return new SequentialFileMessageStore<I, M>(config, bufferSize, fileName); } http://git-wip-us.apache.org/repos/asf/giraph/blob/a47ca0b4/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index 415009c..44d09c9 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -528,7 +528,7 @@ public interface GiraphConstants { /** * Comma-separated list of directories in the local filesystem for - * out-of-core partitions. + * out-of-core partitions. */ String PARTITIONS_DIRECTORY = "giraph.partitionsDirectory"; /** Default directory for out-of-core partitions. */
