Updated Branches:
  refs/heads/trunk a90747149 -> 8c86fa656

GIRAPH-508: Increase the limit on the number of partitions (majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/8c86fa65
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/8c86fa65
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/8c86fa65

Branch: refs/heads/trunk
Commit: 8c86fa656d1dafeaba3fe55ac184ff3cbaa3e324
Parents: a907471
Author: Maja Kabiljo <[email protected]>
Authored: Thu Feb 7 18:03:05 2013 -0800
Committer: Maja Kabiljo <[email protected]>
Committed: Thu Feb 7 18:06:03 2013 -0800

----------------------------------------------------------------------
 CHANGELOG                                          |    2 +
 .../apache/giraph/conf/GiraphConfiguration.java    |    9 +++
 .../graph/AddressesAndPartitionsWritable.java      |   48 ++++++++++++++-
 .../giraph/partition/BasicPartitionOwner.java      |   34 ++++++++++
 .../giraph/partition/HashMasterPartitioner.java    |   35 ++++++++---
 .../apache/giraph/partition/PartitionOwner.java    |   26 ++++++++
 .../giraph/partition/RangePartitionOwner.java      |   16 +++++
 7 files changed, 160 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/8c86fa65/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 808c51b..bd4ef0d 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-508: Increase the limit on the number of partitions (majakabiljo)
+
   GIRAPH-509: Factor out AggregatorUsage (majakabiljo)
 
   GIRAPH-505: Metrics Updates (nitay)

http://git-wip-us.apache.org/repos/asf/giraph/blob/8c86fa65/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java 
b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 9ca1e7e..7e48103 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -594,6 +594,15 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
+   * Check if checkpointing is used
+   *
+   * @return True iff checkpointing is used
+   */
+  public boolean useCheckpointing() {
+    return getCheckpointFrequency() != 0;
+  }
+
+  /**
    * Set the max task attempts
    *
    * @param maxTaskAttempts Max task attempts to use

http://git-wip-us.apache.org/repos/asf/giraph/blob/8c86fa65/giraph-core/src/main/java/org/apache/giraph/graph/AddressesAndPartitionsWritable.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/graph/AddressesAndPartitionsWritable.java
 
b/giraph-core/src/main/java/org/apache/giraph/graph/AddressesAndPartitionsWritable.java
index 990b04e..1139610 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/graph/AddressesAndPartitionsWritable.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/graph/AddressesAndPartitionsWritable.java
@@ -23,13 +23,16 @@ import org.apache.giraph.master.MasterInfo;
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.Writable;
 
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Helper class to write descriptions of master, workers and partition owners
@@ -105,9 +108,26 @@ public class AddressesAndPartitionsWritable implements 
Writable {
       workerInfo.write(output);
     }
 
+    Map<Integer, WorkerInfo> workerInfoMap = getAsWorkerInfoMap(workerInfos);
+    // Also write out the previous worker information that are used
+    // in the partition owners
+    List<WorkerInfo> previousWorkerInfos = Lists.newArrayList();
+    for (PartitionOwner partitionOwner : partitionOwners) {
+      if (partitionOwner.getPreviousWorkerInfo() != null) {
+        if (!workerInfoMap.containsKey(
+            partitionOwner.getPreviousWorkerInfo().getTaskId())) {
+          previousWorkerInfos.add(partitionOwner.getPreviousWorkerInfo());
+        }
+      }
+    }
+    output.writeInt(previousWorkerInfos.size());
+    for (WorkerInfo workerInfo : previousWorkerInfos) {
+      workerInfo.write(output);
+    }
+
     output.writeInt(partitionOwners.size());
     for (PartitionOwner partitionOwner : partitionOwners) {
-      partitionOwner.write(output);
+      partitionOwner.writeWithWorkerIds(output);
     }
   }
 
@@ -124,12 +144,20 @@ public class AddressesAndPartitionsWritable implements 
Writable {
       workerInfos.add(workerInfo);
     }
 
+    Map<Integer, WorkerInfo> workerInfoMap = getAsWorkerInfoMap(workerInfos);
+    int additionalWorkerInfos = input.readInt();
+    for (int i = 0; i < additionalWorkerInfos; i++) {
+      WorkerInfo workerInfo = new WorkerInfo();
+      workerInfo.readFields(input);
+      workerInfoMap.put(workerInfo.getTaskId(), workerInfo);
+    }
+
     int partitionOwnersSize = input.readInt();
     partitionOwners = Lists.newArrayListWithCapacity(partitionOwnersSize);
     for (int i = 0; i < partitionOwnersSize; i++) {
       try {
         PartitionOwner partitionOwner = partitionOwnerClass.newInstance();
-        partitionOwner.readFields(input);
+        partitionOwner.readFieldsWithWorkerIds(input, workerInfoMap);
         partitionOwners.add(partitionOwner);
       } catch (InstantiationException e) {
         throw new IllegalStateException("readFields: " +
@@ -142,4 +170,20 @@ public class AddressesAndPartitionsWritable implements 
Writable {
       }
     }
   }
+
+  /**
+   * Convert Iterable of WorkerInfos to the map from task id to WorkerInfo.
+   *
+   * @param workerInfos Iterable of WorkerInfos
+   * @return The map from task id to WorkerInfo
+   */
+  private static Map<Integer, WorkerInfo> getAsWorkerInfoMap(
+      Iterable<WorkerInfo> workerInfos) {
+    Map<Integer, WorkerInfo> workerInfoMap =
+        Maps.newHashMapWithExpectedSize(Iterables.size(workerInfos));
+    for (WorkerInfo workerInfo : workerInfos) {
+      workerInfoMap.put(workerInfo.getTaskId(), workerInfo);
+    }
+    return workerInfoMap;
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8c86fa65/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartitionOwner.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartitionOwner.java
 
b/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartitionOwner.java
index c1df04b..545d1af 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartitionOwner.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartitionOwner.java
@@ -21,6 +21,7 @@ package org.apache.giraph.partition;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.Map;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -112,6 +113,39 @@ public class BasicPartitionOwner implements PartitionOwner,
   }
 
   @Override
+  public void writeWithWorkerIds(DataOutput output) throws IOException {
+    output.writeInt(partitionId);
+    output.writeInt(workerInfo.getTaskId());
+    if (previousWorkerInfo != null) {
+      output.writeInt(previousWorkerInfo.getTaskId());
+    } else {
+      output.writeInt(-1);
+    }
+    if (checkpointFilesPrefix != null) {
+      output.writeBoolean(true);
+      output.writeUTF(checkpointFilesPrefix);
+    } else {
+      output.writeBoolean(false);
+    }
+  }
+
+  @Override
+  public void readFieldsWithWorkerIds(DataInput input,
+      Map<Integer, WorkerInfo> workerInfoMap) throws IOException {
+    partitionId = input.readInt();
+    int workerId = input.readInt();
+    workerInfo = workerInfoMap.get(workerId);
+    int previousWorkerId = input.readInt();
+    if (previousWorkerId != -1) {
+      previousWorkerInfo = workerInfoMap.get(previousWorkerId);
+    }
+    boolean hasCheckpointFilePrefix = input.readBoolean();
+    if (hasCheckpointFilePrefix) {
+      checkpointFilesPrefix = input.readUTF();
+    }
+  }
+
+  @Override
   public void readFields(DataInput input) throws IOException {
     partitionId = input.readInt();
     workerInfo = new WorkerInfo();

http://git-wip-us.apache.org/repos/asf/giraph/blob/8c86fa65/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
 
b/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
index fc56216..a9611d9 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
@@ -53,11 +53,6 @@ public class HashMasterPartitioner<I extends 
WritableComparable,
   public static final int DEFAULT_USER_PARTITION_COUNT = -1;
   /** Class logger */
   private static Logger LOG = Logger.getLogger(HashMasterPartitioner.class);
-  /**
-   * ZooKeeper has a limit of the data in a single znode of 1 MB and
-   * each entry can go be on the average somewhat more than 300 bytes
-   */
-  private static final int MAX_PARTITIONS = 1024 * 1024 / 350;
   /** Provided configuration */
   private ImmutableClassesGiraphConfiguration conf;
   /** Specified partition count (overrides calculation) */
@@ -104,11 +99,12 @@ public class HashMasterPartitioner<I extends 
WritableComparable,
         (availableWorkerInfos.size() *
          availableWorkerInfos.size()) + " partitions.");
     }
-    if (partitionCount > MAX_PARTITIONS) {
+    int maxPartitions = getMaxPartitions();
+    if (partitionCount > maxPartitions) {
       LOG.warn("createInitialPartitionOwners: " +
-          "Reducing the partitionCount to " + MAX_PARTITIONS +
+          "Reducing the partitionCount to " + maxPartitions +
           " from " + partitionCount);
-      partitionCount = MAX_PARTITIONS;
+      partitionCount = maxPartitions;
     }
 
     for (int i = 0; i < partitionCount; ++i) {
@@ -154,4 +150,27 @@ public class HashMasterPartitioner<I extends 
WritableComparable,
   public PartitionStats createPartitionStats() {
     return new PartitionStats();
   }
+
+  /**
+   * Get the maximum number of partitions supported by Giraph.
+   *
+   * ZooKeeper has a limit of the data in a single znode of 1 MB,
+   * and we write all partition descriptions to the same znode.
+   *
+   * If we are not using checkpointing, each partition owner is serialized
+   * as 4 ints (16B), and we need some space to write the list of workers
+   * there. 50k partitions is conservative enough.
+   *
+   * When checkpointing is used, we need enough space to write all the
+   * checkpoint file paths.
+   *
+   * @return Maximum number of partitions allowed
+   */
+  private int getMaxPartitions() {
+    if (conf.useCheckpointing()) {
+      return 5000;
+    } else {
+      return 50000;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8c86fa65/giraph-core/src/main/java/org/apache/giraph/partition/PartitionOwner.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionOwner.java 
b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionOwner.java
index a886d79..0ac74da 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionOwner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionOwner.java
@@ -21,6 +21,11 @@ package org.apache.giraph.partition;
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.Writable;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
 /**
  * Metadata about ownership of a partition.
  */
@@ -78,4 +83,25 @@ public interface PartitionOwner extends Writable {
    * @param checkpointFilesPrefix HDFS checkpoint file prefix
    */
   void setCheckpointFilesPrefix(String checkpointFilesPrefix);
+
+  /**
+   * Write to the output, but don't serialize the whole WorkerInfo,
+   * instead use just the task id
+   *
+   * @param output Output to write to
+   * @throws IOException
+   */
+  void writeWithWorkerIds(DataOutput output) throws IOException;
+
+  /**
+   * A match for writeWithWorkerIds method - for WorkerInfos it will read
+   * just task ids from input and then find the matching WorkerInfo in the
+   * provided map and set it
+   *
+   * @param input Input to read from
+   * @param workerInfoMap Map from task id to WorkerInfo
+   * @throws IOException
+   */
+  void readFieldsWithWorkerIds(DataInput input,
+      Map<Integer, WorkerInfo> workerInfoMap) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8c86fa65/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionOwner.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionOwner.java
 
b/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionOwner.java
index 1ecedb8..e7e03dc 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionOwner.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionOwner.java
@@ -21,7 +21,9 @@ package org.apache.giraph.partition;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.Map;
 
+import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.WritableComparable;
 
 /**
@@ -71,4 +73,18 @@ public class RangePartitionOwner<I extends 
WritableComparable>
     super.write(output);
     maxIndex.write(output);
   }
+
+  @Override
+  public void writeWithWorkerIds(DataOutput output) throws IOException {
+    super.writeWithWorkerIds(output);
+    maxIndex.write(output);
+  }
+
+  @Override
+  public void readFieldsWithWorkerIds(DataInput input,
+      Map<Integer, WorkerInfo> workerInfoMap) throws IOException {
+    super.readFieldsWithWorkerIds(input, workerInfoMap);
+    maxIndex = (I) getConf().createVertexId();
+    maxIndex.readFields(input);
+  }
 }

Reply via email to