Updated Branches: refs/heads/trunk a90747149 -> 8c86fa656
GIRAPH-508: Increase the limit on the number of partitions (majakabiljo) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/8c86fa65 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/8c86fa65 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/8c86fa65 Branch: refs/heads/trunk Commit: 8c86fa656d1dafeaba3fe55ac184ff3cbaa3e324 Parents: a907471 Author: Maja Kabiljo <[email protected]> Authored: Thu Feb 7 18:03:05 2013 -0800 Committer: Maja Kabiljo <[email protected]> Committed: Thu Feb 7 18:06:03 2013 -0800 ---------------------------------------------------------------------- CHANGELOG | 2 + .../apache/giraph/conf/GiraphConfiguration.java | 9 +++ .../graph/AddressesAndPartitionsWritable.java | 48 ++++++++++++++- .../giraph/partition/BasicPartitionOwner.java | 34 ++++++++++ .../giraph/partition/HashMasterPartitioner.java | 35 ++++++++--- .../apache/giraph/partition/PartitionOwner.java | 26 ++++++++ .../giraph/partition/RangePartitionOwner.java | 16 +++++ 7 files changed, 160 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/8c86fa65/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 808c51b..bd4ef0d 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-508: Increase the limit on the number of partitions (majakabiljo) + GIRAPH-509: Factor out AggregatorUsage (majakabiljo) GIRAPH-505: Metrics Updates (nitay) http://git-wip-us.apache.org/repos/asf/giraph/blob/8c86fa65/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java index 9ca1e7e..7e48103 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java @@ -594,6 +594,15 @@ public class GiraphConfiguration extends Configuration } /** + * Check if checkpointing is used + * + * @return True iff checkpointing is used + */ + public boolean useCheckpointing() { + return getCheckpointFrequency() != 0; + } + + /** * Set the max task attempts * * @param maxTaskAttempts Max task attempts to use http://git-wip-us.apache.org/repos/asf/giraph/blob/8c86fa65/giraph-core/src/main/java/org/apache/giraph/graph/AddressesAndPartitionsWritable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/AddressesAndPartitionsWritable.java b/giraph-core/src/main/java/org/apache/giraph/graph/AddressesAndPartitionsWritable.java index 990b04e..1139610 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/AddressesAndPartitionsWritable.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/AddressesAndPartitionsWritable.java @@ -23,13 +23,16 @@ import org.apache.giraph.master.MasterInfo; import org.apache.giraph.worker.WorkerInfo; import org.apache.hadoop.io.Writable; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Collection; import java.util.List; +import java.util.Map; /** * Helper class to write descriptions of master, workers and partition owners @@ -105,9 +108,26 @@ public class AddressesAndPartitionsWritable implements Writable { workerInfo.write(output); } + Map<Integer, WorkerInfo> workerInfoMap = getAsWorkerInfoMap(workerInfos); + // Also write out the previous worker information that are used + // in the partition owners + List<WorkerInfo> previousWorkerInfos = Lists.newArrayList(); + for (PartitionOwner partitionOwner : partitionOwners) { + if (partitionOwner.getPreviousWorkerInfo() != null) { + if (!workerInfoMap.containsKey( + partitionOwner.getPreviousWorkerInfo().getTaskId())) { + previousWorkerInfos.add(partitionOwner.getPreviousWorkerInfo()); + } + } + } + output.writeInt(previousWorkerInfos.size()); + for (WorkerInfo workerInfo : previousWorkerInfos) { + workerInfo.write(output); + } + output.writeInt(partitionOwners.size()); for (PartitionOwner partitionOwner : partitionOwners) { - partitionOwner.write(output); + partitionOwner.writeWithWorkerIds(output); } } @@ -124,12 +144,20 @@ public class AddressesAndPartitionsWritable implements Writable { workerInfos.add(workerInfo); } + Map<Integer, WorkerInfo> workerInfoMap = getAsWorkerInfoMap(workerInfos); + int additionalWorkerInfos = input.readInt(); + for (int i = 0; i < additionalWorkerInfos; i++) { + WorkerInfo workerInfo = new WorkerInfo(); + workerInfo.readFields(input); + workerInfoMap.put(workerInfo.getTaskId(), workerInfo); + } + int partitionOwnersSize = input.readInt(); partitionOwners = Lists.newArrayListWithCapacity(partitionOwnersSize); for (int i = 0; i < partitionOwnersSize; i++) { try { PartitionOwner partitionOwner = partitionOwnerClass.newInstance(); - partitionOwner.readFields(input); + partitionOwner.readFieldsWithWorkerIds(input, workerInfoMap); partitionOwners.add(partitionOwner); } catch (InstantiationException e) { throw new IllegalStateException("readFields: " + @@ -142,4 +170,20 @@ public class AddressesAndPartitionsWritable implements Writable { } } } + + /** + * Convert Iterable of WorkerInfos to the map from task id to WorkerInfo. + * + * @param workerInfos Iterable of WorkerInfos + * @return The map from task id to WorkerInfo + */ + private static Map<Integer, WorkerInfo> getAsWorkerInfoMap( + Iterable<WorkerInfo> workerInfos) { + Map<Integer, WorkerInfo> workerInfoMap = + Maps.newHashMapWithExpectedSize(Iterables.size(workerInfos)); + for (WorkerInfo workerInfo : workerInfos) { + workerInfoMap.put(workerInfo.getTaskId(), workerInfo); + } + return workerInfoMap; + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/8c86fa65/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartitionOwner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartitionOwner.java b/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartitionOwner.java index c1df04b..545d1af 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartitionOwner.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartitionOwner.java @@ -21,6 +21,7 @@ package org.apache.giraph.partition; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.Map; import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; @@ -112,6 +113,39 @@ public class BasicPartitionOwner implements PartitionOwner, } @Override + public void writeWithWorkerIds(DataOutput output) throws IOException { + output.writeInt(partitionId); + output.writeInt(workerInfo.getTaskId()); + if (previousWorkerInfo != null) { + output.writeInt(previousWorkerInfo.getTaskId()); + } else { + output.writeInt(-1); + } + if (checkpointFilesPrefix != null) { + output.writeBoolean(true); + output.writeUTF(checkpointFilesPrefix); + } else { + output.writeBoolean(false); + } + } + + @Override + public void readFieldsWithWorkerIds(DataInput input, + Map<Integer, WorkerInfo> workerInfoMap) throws IOException { + partitionId = input.readInt(); + int workerId = input.readInt(); + workerInfo = workerInfoMap.get(workerId); + int previousWorkerId = input.readInt(); + if (previousWorkerId != -1) { + previousWorkerInfo = workerInfoMap.get(previousWorkerId); + } + boolean hasCheckpointFilePrefix = input.readBoolean(); + if (hasCheckpointFilePrefix) { + checkpointFilesPrefix = input.readUTF(); + } + } + + @Override public void readFields(DataInput input) throws IOException { partitionId = input.readInt(); workerInfo = new WorkerInfo(); http://git-wip-us.apache.org/repos/asf/giraph/blob/8c86fa65/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java index fc56216..a9611d9 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java @@ -53,11 +53,6 @@ public class HashMasterPartitioner<I extends WritableComparable, public static final int DEFAULT_USER_PARTITION_COUNT = -1; /** Class logger */ private static Logger LOG = Logger.getLogger(HashMasterPartitioner.class); - /** - * ZooKeeper has a limit of the data in a single znode of 1 MB and - * each entry can go be on the average somewhat more than 300 bytes - */ - private static final int MAX_PARTITIONS = 1024 * 1024 / 350; /** Provided configuration */ private ImmutableClassesGiraphConfiguration conf; /** Specified partition count (overrides calculation) */ @@ -104,11 +99,12 @@ public class HashMasterPartitioner<I extends WritableComparable, (availableWorkerInfos.size() * availableWorkerInfos.size()) + " partitions."); } - if (partitionCount > MAX_PARTITIONS) { + int maxPartitions = getMaxPartitions(); + if (partitionCount > maxPartitions) { LOG.warn("createInitialPartitionOwners: " + - "Reducing the partitionCount to " + MAX_PARTITIONS + + "Reducing the partitionCount to " + maxPartitions + " from " + partitionCount); - partitionCount = MAX_PARTITIONS; + partitionCount = maxPartitions; } for (int i = 0; i < partitionCount; ++i) { @@ -154,4 +150,27 @@ public class HashMasterPartitioner<I extends WritableComparable, public PartitionStats createPartitionStats() { return new PartitionStats(); } + + /** + * Get the maximum number of partitions supported by Giraph. + * + * ZooKeeper has a limit of the data in a single znode of 1 MB, + * and we write all partition descriptions to the same znode. + * + * If we are not using checkpointing, each partition owner is serialized + * as 4 ints (16B), and we need some space to write the list of workers + * there. 50k partitions is conservative enough. + * + * When checkpointing is used, we need enough space to write all the + * checkpoint file paths. + * + * @return Maximum number of partitions allowed + */ + private int getMaxPartitions() { + if (conf.useCheckpointing()) { + return 5000; + } else { + return 50000; + } + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/8c86fa65/giraph-core/src/main/java/org/apache/giraph/partition/PartitionOwner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionOwner.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionOwner.java index a886d79..0ac74da 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionOwner.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionOwner.java @@ -21,6 +21,11 @@ package org.apache.giraph.partition; import org.apache.giraph.worker.WorkerInfo; import org.apache.hadoop.io.Writable; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Map; + /** * Metadata about ownership of a partition. */ @@ -78,4 +83,25 @@ public interface PartitionOwner extends Writable { * @param checkpointFilesPrefix HDFS checkpoint file prefix */ void setCheckpointFilesPrefix(String checkpointFilesPrefix); + + /** + * Write to the output, but don't serialize the whole WorkerInfo, + * instead use just the task id + * + * @param output Output to write to + * @throws IOException + */ + void writeWithWorkerIds(DataOutput output) throws IOException; + + /** + * A match for writeWithWorkerIds method - for WorkerInfos it will read + * just task ids from input and then find the matching WorkerInfo in the + * provided map and set it + * + * @param input Input to read from + * @param workerInfoMap Map from task id to WorkerInfo + * @throws IOException + */ + void readFieldsWithWorkerIds(DataInput input, + Map<Integer, WorkerInfo> workerInfoMap) throws IOException; } http://git-wip-us.apache.org/repos/asf/giraph/blob/8c86fa65/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionOwner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionOwner.java b/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionOwner.java index 1ecedb8..e7e03dc 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionOwner.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionOwner.java @@ -21,7 +21,9 @@ package org.apache.giraph.partition; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.Map; +import org.apache.giraph.worker.WorkerInfo; import org.apache.hadoop.io.WritableComparable; /** @@ -71,4 +73,18 @@ public class RangePartitionOwner<I extends WritableComparable> super.write(output); maxIndex.write(output); } + + @Override + public void writeWithWorkerIds(DataOutput output) throws IOException { + super.writeWithWorkerIds(output); + maxIndex.write(output); + } + + @Override + public void readFieldsWithWorkerIds(DataInput input, + Map<Integer, WorkerInfo> workerInfoMap) throws IOException { + super.readFieldsWithWorkerIds(input, workerInfoMap); + maxIndex = (I) getConf().createVertexId(); + maxIndex.readFields(input); + } }
