Updated Branches: refs/heads/trunk c17a6483c -> 7fc9390d3
GIRAPH 513 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/7fc9390d Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/7fc9390d Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/7fc9390d Branch: refs/heads/trunk Commit: 7fc9390d383799aee37917ac1050db19c89b8b98 Parents: c17a648 Author: Claudio Martella <[email protected]> Authored: Thu Feb 14 15:33:07 2013 +0100 Committer: Claudio Martella <[email protected]> Committed: Thu Feb 14 15:33:07 2013 +0100 ---------------------------------------------------------------------- .../org/apache/giraph/conf/GiraphConstants.java | 5 ++- .../giraph/partition/DiskBackedPartitionStore.java | 31 ++++++++++---- 2 files changed, 26 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/7fc9390d/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index 8797c0e..415009c 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -526,7 +526,10 @@ public interface GiraphConstants { /** Default size of buffer when reading and writing messages out-of-core. */ int MESSAGES_BUFFER_SIZE_DEFAULT = 8192; - /** Directory in the local filesystem for out-of-core partitions. */ + /** + * Comma-separated list of directories in the local filesystem for + * out-of-core partitions. + */ String PARTITIONS_DIRECTORY = "giraph.partitionsDirectory"; /** Default directory for out-of-core partitions. */ String PARTITIONS_DIRECTORY_DEFAULT = "_bsp/_partitions"; http://git-wip-us.apache.org/repos/asf/giraph/blob/7fc9390d/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java index 844a229..725de39 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java @@ -53,6 +53,8 @@ import org.apache.log4j.Logger; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; /** * Disk-backed PartitionStore. Partitions are stored in memory on a LRU basis. @@ -107,7 +109,9 @@ public class DiskBackedPartitionStore<I extends WritableComparable, /** Mapper context */ private final Context context; /** Base path where the partition files are written to */ - private final String basePath; + private final String[] basePaths; + /** Used to hash partition Ids */ + private final HashFunction hasher = Hashing.murmur3_32(); /** Maximum number of slots */ private final int maxInMemoryPartitions; /** Number of slots used */ @@ -128,9 +132,16 @@ public class DiskBackedPartitionStore<I extends WritableComparable, maxInMemoryPartitions = Math.max(1, conf.getInt(GiraphConstants.MAX_PARTITIONS_IN_MEMORY, GiraphConstants.MAX_PARTITIONS_IN_MEMORY_DEFAULT)); - basePath = conf.get("mapred.job.id", "Unknown Job") + - conf.get(GiraphConstants.PARTITIONS_DIRECTORY, - GiraphConstants.PARTITIONS_DIRECTORY_DEFAULT); + + // Take advantage of multiple disks + String[] userPaths = conf.getStrings( + GiraphConstants.PARTITIONS_DIRECTORY, + GiraphConstants.PARTITIONS_DIRECTORY_DEFAULT); + basePaths = new String[userPaths.length]; + int i = 0; + for (String path : userPaths) { + basePaths[i++] = path + "/" + conf.get("mapred.job.id", "Unknown Job"); + } } @Override @@ -378,13 +389,13 @@ public class DiskBackedPartitionStore<I extends WritableComparable, */ private void offloadPartition(Partition<I, V, E, M> partition) throws IOException { - if (LOG.isInfoEnabled()) { - LOG.info("offloadPartition: writing partition " + partition.getId() + - " to disk."); - } File file = new File(getPartitionPath(partition.getId())); file.getParentFile().mkdirs(); file.createNewFile(); + if (LOG.isInfoEnabled()) { + LOG.info("offloadPartition: writing partition " + partition.getId() + + " to " + file.getAbsolutePath()); + } DataOutputStream outputStream = new DataOutputStream( new BufferedOutputStream(new FileOutputStream(file))); for (Vertex<I, V, E, M> vertex : partition) { @@ -431,7 +442,9 @@ public class DiskBackedPartitionStore<I extends WritableComparable, * @return The path to the given partition */ private String getPartitionPath(Integer partitionId) { - return basePath + "/partition-" + partitionId; + int hash = hasher.hashInt(partitionId).asInt(); + int idx = Math.abs(hash % basePaths.length); + return basePaths[idx] + "/partition-" + partitionId; } /**
