Repository: giraph
Updated Branches:
  refs/heads/trunk 5d0b81ac4 -> 92a8f1ca1


[GIRAPH-1003] Support Writable Basic2ObjectMap

Summary:
Basic2ObjectMap isn't and cannot be writable (since object itself isn't 
writable), which makes it drastically
less useful - since most of the objects need to be serialized.

But if we don't count no-arg constructor - we can make it Writable - by having 
valueWriter
passed into through constructor.

Test Plan: mvn clean install

Reviewers: sergey.edunov, dionysis.logothetis, maja.kabiljo

Reviewed By: dionysis.logothetis

Differential Revision: https://reviews.facebook.net/D36987


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/92a8f1ca
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/92a8f1ca
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/92a8f1ca

Branch: refs/heads/trunk
Commit: 92a8f1ca1b472e539e4b9831679848cd108ab29a
Parents: 5d0b81a
Author: Igor Kabiljo <[email protected]>
Authored: Wed Apr 15 14:31:52 2015 -0700
Committer: Igor Kabiljo <[email protected]>
Committed: Wed Apr 15 14:31:52 2015 -0700

----------------------------------------------------------------------
 .../primitives/IdByteArrayMessageStore.java     |  9 +-
 .../primitives/IdOneMessagePerVertexStore.java  |  9 +-
 .../org/apache/giraph/types/ops/IntTypeOps.java | 11 ++-
 .../apache/giraph/types/ops/LongTypeOps.java    | 11 ++-
 .../giraph/types/ops/PrimitiveIdTypeOps.java    | 22 ++++-
 .../types/ops/collections/Basic2ObjectMap.java  | 89 ++++++++++++--------
 6 files changed, 105 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/92a8f1ca/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java
 
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java
index efe6199..5342593 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java
@@ -106,7 +106,8 @@ public class IdByteArrayMessageStore<I extends 
WritableComparable,
           service.getPartitionStore().getOrCreatePartition(partitionId);
       Basic2ObjectMap<I, DataInputOutput> partitionMap =
           idTypeOps.create2ObjectOpenHashMap(
-              Math.max(10, (int) partition.getVertexCount()));
+              Math.max(10, (int) partition.getVertexCount()),
+              dataInputOutputWriter);
 
       map.put(partitionId, partitionMap);
       service.getPartitionStore().putPartition((Partition) partition);
@@ -221,15 +222,15 @@ public class IdByteArrayMessageStore<I extends 
WritableComparable,
   public void writePartition(DataOutput out, int partitionId)
     throws IOException {
     Basic2ObjectMap<I, DataInputOutput> partitionMap = map.get(partitionId);
-    partitionMap.write(out, dataInputOutputWriter);
+    partitionMap.write(out);
   }
 
   @Override
   public void readFieldsForPartition(DataInput in, int partitionId)
     throws IOException {
     Basic2ObjectMap<I, DataInputOutput> partitionMap =
-        idTypeOps.create2ObjectOpenHashMap(10);
-    partitionMap.readFields(in, dataInputOutputWriter);
+        idTypeOps.create2ObjectOpenHashMap(dataInputOutputWriter);
+    partitionMap.readFields(in);
     synchronized (map) {
       map.put(partitionId, partitionMap);
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/92a8f1ca/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
 
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
index 1d2407c..373389d 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
@@ -109,7 +109,7 @@ public class IdOneMessagePerVertexStore<I extends 
WritableComparable,
       Partition<I, ?, ?> partition =
           service.getPartitionStore().getOrCreatePartition(partitionId);
       Basic2ObjectMap<I, M> partitionMap = idTypeOps.create2ObjectOpenHashMap(
-          (int) partition.getVertexCount());
+          Math.max(10, (int) partition.getVertexCount()), messageWriter);
       map.put(partitionId, partitionMap);
       service.getPartitionStore().putPartition((Partition) partition);
     }
@@ -202,14 +202,15 @@ public class IdOneMessagePerVertexStore<I extends 
WritableComparable,
   public void writePartition(DataOutput out,
       int partitionId) throws IOException {
     Basic2ObjectMap<I, M> partitionMap = map.get(partitionId);
-    partitionMap.write(out, messageWriter);
+    partitionMap.write(out);
   }
 
   @Override
   public void readFieldsForPartition(DataInput in,
       int partitionId) throws IOException {
-    Basic2ObjectMap<I, M> partitionMap = 
idTypeOps.create2ObjectOpenHashMap(10);
-    partitionMap.readFields(in, messageWriter);
+    Basic2ObjectMap<I, M> partitionMap = idTypeOps.create2ObjectOpenHashMap(
+        messageWriter);
+    partitionMap.readFields(in);
     synchronized (map) {
       map.put(partitionId, partitionMap);
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/92a8f1ca/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java 
b/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java
index 4ac06da..d159834 100644
--- a/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java
@@ -23,6 +23,7 @@ import org.apache.giraph.types.ops.collections.BasicArrayList;
 import 
org.apache.giraph.types.ops.collections.BasicArrayList.BasicIntArrayList;
 import org.apache.giraph.types.ops.collections.BasicSet;
 import org.apache.giraph.types.ops.collections.BasicSet.BasicIntOpenHashSet;
+import org.apache.giraph.types.ops.collections.WritableWriter;
 import org.apache.hadoop.io.IntWritable;
 
 /** TypeOps implementation for working with IntWritable type */
@@ -73,8 +74,14 @@ public enum IntTypeOps
 
   @Override
   public <V> Basic2ObjectMap<IntWritable, V> create2ObjectOpenHashMap(
-      int capacity) {
-    return new BasicInt2ObjectOpenHashMap<>(capacity);
+      WritableWriter<V> valueWriter) {
+    return new BasicInt2ObjectOpenHashMap<>(valueWriter);
+  }
+
+  @Override
+  public <V> Basic2ObjectMap<IntWritable, V> create2ObjectOpenHashMap(
+      int capacity, WritableWriter<V> valueWriter) {
+    return new BasicInt2ObjectOpenHashMap<>(capacity, valueWriter);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/92a8f1ca/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java 
b/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java
index 3e2ab31..741a7a9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java
@@ -23,6 +23,7 @@ import org.apache.giraph.types.ops.collections.BasicArrayList;
 import 
org.apache.giraph.types.ops.collections.BasicArrayList.BasicLongArrayList;
 import org.apache.giraph.types.ops.collections.BasicSet;
 import org.apache.giraph.types.ops.collections.BasicSet.BasicLongOpenHashSet;
+import org.apache.giraph.types.ops.collections.WritableWriter;
 import org.apache.hadoop.io.LongWritable;
 
 /** TypeOps implementation for working with LongWritable type */
@@ -73,8 +74,14 @@ public enum LongTypeOps
 
   @Override
   public <V> Basic2ObjectMap<LongWritable, V> create2ObjectOpenHashMap(
-      int capacity) {
-    return new BasicLong2ObjectOpenHashMap<>(capacity);
+      WritableWriter<V> valueWriter) {
+    return new BasicLong2ObjectOpenHashMap<>(valueWriter);
+  }
+
+  @Override
+  public <V> Basic2ObjectMap<LongWritable, V> create2ObjectOpenHashMap(
+      int capacity, WritableWriter<V> valueWriter) {
+    return new BasicLong2ObjectOpenHashMap<>(capacity, valueWriter);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/92a8f1ca/giraph-core/src/main/java/org/apache/giraph/types/ops/PrimitiveIdTypeOps.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/types/ops/PrimitiveIdTypeOps.java 
b/giraph-core/src/main/java/org/apache/giraph/types/ops/PrimitiveIdTypeOps.java
index 55b2737..b157885 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/types/ops/PrimitiveIdTypeOps.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/types/ops/PrimitiveIdTypeOps.java
@@ -19,6 +19,7 @@ package org.apache.giraph.types.ops;
 
 import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
 import org.apache.giraph.types.ops.collections.BasicSet;
+import org.apache.giraph.types.ops.collections.WritableWriter;
 
 
 
@@ -50,12 +51,31 @@ public interface PrimitiveIdTypeOps<T> extends 
PrimitiveTypeOps<T> {
   BasicSet<T> createOpenHashSet(int capacity);
 
   /**
+   * Create Basic2ObjectMap with key type T.
+   * Values are represented as object, even if they can be primitive.
+   *
+   * You can pass null as valueWriter,
+   * but readFields/write will throw an Exception, if called.
+   *
+   * @param valueWriter Writer of values
+   * @param <V> Type of values in the map
+   * @return Basic2ObjectMap
+   */
+  <V> Basic2ObjectMap<T, V> create2ObjectOpenHashMap(
+      WritableWriter<V> valueWriter);
+
+  /**
    * Create Basic2ObjectMap with key type T, given capacity.
    * Values are represented as object, even if they can be primitive.
    *
+   * You can pass null as valueWriter,
+   * but readFields/write will throw an Exception, if called.
+   *
    * @param capacity Capacity
+   * @param valueWriter Writer of values
    * @param <V> Type of values in the map
    * @return Basic2ObjectMap
    */
-  <V> Basic2ObjectMap<T, V> create2ObjectOpenHashMap(int capacity);
+  <V> Basic2ObjectMap<T, V> create2ObjectOpenHashMap(
+      int capacity, WritableWriter<V> valueWriter);
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/92a8f1ca/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/Basic2ObjectMap.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/Basic2ObjectMap.java
 
b/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/Basic2ObjectMap.java
index f7ef570..299dced 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/Basic2ObjectMap.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/Basic2ObjectMap.java
@@ -35,6 +35,7 @@ import org.apache.giraph.types.ops.LongTypeOps;
 import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
 
 /**
  * Basic2ObjectMap with only basic set of operations.
@@ -44,7 +45,7 @@ import org.apache.hadoop.io.LongWritable;
  * @param <K> Key type
  * @param <V> Value type
  */
-public abstract class Basic2ObjectMap<K, V> {
+public abstract class Basic2ObjectMap<K, V> implements Writable {
   /** Removes all of the elements from this list. */
   public abstract void clear();
   /**
@@ -99,31 +100,14 @@ public abstract class Basic2ObjectMap<K, V> {
   public abstract Iterator<K> fastKeyIterator();
 
   /**
-   * Serializes the object, given a writer for values.
-   * @param out <code>DataOuput</code> to serialize object into.
-   * @param writer Writer of values
-   * @throws IOException
-   */
-  public abstract void write(DataOutput out, WritableWriter<V> writer)
-    throws IOException;
-  /**
-   * Deserialize the object, given a writer for values.
-   * @param in <code>DataInput</code> to deseriablize object from.
-   * @param writer Writer of values
-   * @throws IOException
-   */
-  public abstract void readFields(DataInput in, WritableWriter<V> writer)
-    throws IOException;
-
-  /**
    * Iterator that reuses key object.
    *
    * @param <Iter> Primitive key iterator type
    */
   protected abstract class ReusableIterator<Iter extends Iterator<?>>
-      implements Iterator<K> {
+      implements ResettableIterator<K> {
     /** Primitive Key iterator */
-    protected final Iter iter;
+    protected Iter iter;
     /** Reusable key object */
     protected final K reusableKey = getKeyTypeOps().create();
 
@@ -151,13 +135,29 @@ public abstract class Basic2ObjectMap<K, V> {
       extends Basic2ObjectMap<IntWritable, V> {
     /** Map */
     private final Int2ObjectOpenHashMap<V> map;
+    /** Value writer */
+    private final WritableWriter<V> valueWriter;
+
+    /**
+     * Constructor
+     *
+     * @param valueWriter Writer of values
+     */
+    public BasicInt2ObjectOpenHashMap(WritableWriter<V> valueWriter) {
+      this.map = new Int2ObjectOpenHashMap<>();
+      this.valueWriter = valueWriter;
+    }
 
     /**
      * Constructor
+     *
      * @param capacity Capacity
+     * @param valueWriter Writer of values
      */
-    public BasicInt2ObjectOpenHashMap(int capacity) {
+    public BasicInt2ObjectOpenHashMap(
+        int capacity, WritableWriter<V> valueWriter) {
       this.map = new Int2ObjectOpenHashMap<>(capacity);
+      this.valueWriter = valueWriter;
     }
 
     @Override
@@ -203,31 +203,35 @@ public abstract class Basic2ObjectMap<K, V> {
           reusableKey.set(iter.nextInt());
           return reusableKey;
         }
+
+        @Override
+        public void reset() {
+          iter = map.keySet().iterator();
+        }
       };
     }
 
     @Override
-    public void write(DataOutput out, WritableWriter<V> writer)
-      throws IOException {
+    public void write(DataOutput out) throws IOException {
       out.writeInt(map.size());
       ObjectIterator<Int2ObjectMap.Entry<V>> iterator =
           map.int2ObjectEntrySet().fastIterator();
       while (iterator.hasNext()) {
         Int2ObjectMap.Entry<V> entry = iterator.next();
         out.writeInt(entry.getIntKey());
-        writer.write(out, entry.getValue());
+        valueWriter.write(out, entry.getValue());
       }
     }
 
     @Override
-    public void readFields(DataInput in, WritableWriter<V> writer)
+    public void readFields(DataInput in)
       throws IOException {
       int size = in.readInt();
       map.clear();
       map.trim(size);
       while (size-- > 0) {
         int key = in.readInt();
-        V value = writer.readFields(in);
+        V value = valueWriter.readFields(in);
         map.put(key, value);
       }
     }
@@ -238,13 +242,29 @@ public abstract class Basic2ObjectMap<K, V> {
       extends Basic2ObjectMap<LongWritable, V> {
     /** Map */
     private final Long2ObjectOpenHashMap<V> map;
+    /** Value writer */
+    private final WritableWriter<V> valueWriter;
+
+    /**
+     * Constructor
+     *
+     * @param valueWriter Writer of values
+     */
+    public BasicLong2ObjectOpenHashMap(WritableWriter<V> valueWriter) {
+      this.map = new Long2ObjectOpenHashMap<>();
+      this.valueWriter = valueWriter;
+    }
 
     /**
      * Constructor
+     *
      * @param capacity Capacity
+     * @param valueWriter Writer of values
      */
-    public BasicLong2ObjectOpenHashMap(int capacity) {
+    public BasicLong2ObjectOpenHashMap(
+        int capacity, WritableWriter<V> valueWriter) {
       this.map = new Long2ObjectOpenHashMap<>(capacity);
+      this.valueWriter = valueWriter;
     }
 
     @Override
@@ -290,31 +310,34 @@ public abstract class Basic2ObjectMap<K, V> {
           reusableKey.set(iter.nextLong());
           return reusableKey;
         }
+
+        @Override
+        public void reset() {
+          iter = map.keySet().iterator();
+        }
       };
     }
 
     @Override
-    public void write(DataOutput out, WritableWriter<V> writer)
-      throws IOException {
+    public void write(DataOutput out) throws IOException {
       out.writeInt(map.size());
       ObjectIterator<Long2ObjectMap.Entry<V>> iterator =
           map.long2ObjectEntrySet().fastIterator();
       while (iterator.hasNext()) {
         Long2ObjectMap.Entry<V> entry = iterator.next();
         out.writeLong(entry.getLongKey());
-        writer.write(out, entry.getValue());
+        valueWriter.write(out, entry.getValue());
       }
     }
 
     @Override
-    public void readFields(DataInput in, WritableWriter<V> writer)
-      throws IOException {
+    public void readFields(DataInput in) throws IOException {
       int size = in.readInt();
       map.clear();
       map.trim(size);
       while (size-- > 0) {
         long key = in.readLong();
-        V value = writer.readFields(in);
+        V value = valueWriter.readFields(in);
         map.put(key, value);
       }
     }

Reply via email to