Repository: giraph Updated Branches: refs/heads/trunk 5d0b81ac4 -> 92a8f1ca1
[GIRAPH-1003] Support Writable Basic2ObjectMap Summary: Basic2ObjectMap isn't and cannot be writable (since object itself isn't writable), which makes it drastically less useful - since most of the objects need to be serialized. But if we don't count no-arg constructor - we can make it Writable - by having valueWriter passed into through constructor. Test Plan: mvn clean install Reviewers: sergey.edunov, dionysis.logothetis, maja.kabiljo Reviewed By: dionysis.logothetis Differential Revision: https://reviews.facebook.net/D36987 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/92a8f1ca Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/92a8f1ca Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/92a8f1ca Branch: refs/heads/trunk Commit: 92a8f1ca1b472e539e4b9831679848cd108ab29a Parents: 5d0b81a Author: Igor Kabiljo <[email protected]> Authored: Wed Apr 15 14:31:52 2015 -0700 Committer: Igor Kabiljo <[email protected]> Committed: Wed Apr 15 14:31:52 2015 -0700 ---------------------------------------------------------------------- .../primitives/IdByteArrayMessageStore.java | 9 +- .../primitives/IdOneMessagePerVertexStore.java | 9 +- .../org/apache/giraph/types/ops/IntTypeOps.java | 11 ++- .../apache/giraph/types/ops/LongTypeOps.java | 11 ++- .../giraph/types/ops/PrimitiveIdTypeOps.java | 22 ++++- .../types/ops/collections/Basic2ObjectMap.java | 89 ++++++++++++-------- 6 files changed, 105 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/92a8f1ca/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java index efe6199..5342593 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java @@ -106,7 +106,8 @@ public class IdByteArrayMessageStore<I extends WritableComparable, service.getPartitionStore().getOrCreatePartition(partitionId); Basic2ObjectMap<I, DataInputOutput> partitionMap = idTypeOps.create2ObjectOpenHashMap( - Math.max(10, (int) partition.getVertexCount())); + Math.max(10, (int) partition.getVertexCount()), + dataInputOutputWriter); map.put(partitionId, partitionMap); service.getPartitionStore().putPartition((Partition) partition); @@ -221,15 +222,15 @@ public class IdByteArrayMessageStore<I extends WritableComparable, public void writePartition(DataOutput out, int partitionId) throws IOException { Basic2ObjectMap<I, DataInputOutput> partitionMap = map.get(partitionId); - partitionMap.write(out, dataInputOutputWriter); + partitionMap.write(out); } @Override public void readFieldsForPartition(DataInput in, int partitionId) throws IOException { Basic2ObjectMap<I, DataInputOutput> partitionMap = - idTypeOps.create2ObjectOpenHashMap(10); - partitionMap.readFields(in, dataInputOutputWriter); + idTypeOps.create2ObjectOpenHashMap(dataInputOutputWriter); + partitionMap.readFields(in); synchronized (map) { map.put(partitionId, partitionMap); } http://git-wip-us.apache.org/repos/asf/giraph/blob/92a8f1ca/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java index 1d2407c..373389d 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java @@ -109,7 +109,7 @@ public class IdOneMessagePerVertexStore<I extends WritableComparable, Partition<I, ?, ?> partition = service.getPartitionStore().getOrCreatePartition(partitionId); Basic2ObjectMap<I, M> partitionMap = idTypeOps.create2ObjectOpenHashMap( - (int) partition.getVertexCount()); + Math.max(10, (int) partition.getVertexCount()), messageWriter); map.put(partitionId, partitionMap); service.getPartitionStore().putPartition((Partition) partition); } @@ -202,14 +202,15 @@ public class IdOneMessagePerVertexStore<I extends WritableComparable, public void writePartition(DataOutput out, int partitionId) throws IOException { Basic2ObjectMap<I, M> partitionMap = map.get(partitionId); - partitionMap.write(out, messageWriter); + partitionMap.write(out); } @Override public void readFieldsForPartition(DataInput in, int partitionId) throws IOException { - Basic2ObjectMap<I, M> partitionMap = idTypeOps.create2ObjectOpenHashMap(10); - partitionMap.readFields(in, messageWriter); + Basic2ObjectMap<I, M> partitionMap = idTypeOps.create2ObjectOpenHashMap( + messageWriter); + partitionMap.readFields(in); synchronized (map) { map.put(partitionId, partitionMap); } http://git-wip-us.apache.org/repos/asf/giraph/blob/92a8f1ca/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java index 4ac06da..d159834 100644 --- a/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java +++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java @@ -23,6 +23,7 @@ import org.apache.giraph.types.ops.collections.BasicArrayList; import org.apache.giraph.types.ops.collections.BasicArrayList.BasicIntArrayList; import org.apache.giraph.types.ops.collections.BasicSet; import org.apache.giraph.types.ops.collections.BasicSet.BasicIntOpenHashSet; +import org.apache.giraph.types.ops.collections.WritableWriter; import org.apache.hadoop.io.IntWritable; /** TypeOps implementation for working with IntWritable type */ @@ -73,8 +74,14 @@ public enum IntTypeOps @Override public <V> Basic2ObjectMap<IntWritable, V> create2ObjectOpenHashMap( - int capacity) { - return new BasicInt2ObjectOpenHashMap<>(capacity); + WritableWriter<V> valueWriter) { + return new BasicInt2ObjectOpenHashMap<>(valueWriter); + } + + @Override + public <V> Basic2ObjectMap<IntWritable, V> create2ObjectOpenHashMap( + int capacity, WritableWriter<V> valueWriter) { + return new BasicInt2ObjectOpenHashMap<>(capacity, valueWriter); } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/92a8f1ca/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java index 3e2ab31..741a7a9 100644 --- a/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java +++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java @@ -23,6 +23,7 @@ import org.apache.giraph.types.ops.collections.BasicArrayList; import org.apache.giraph.types.ops.collections.BasicArrayList.BasicLongArrayList; import org.apache.giraph.types.ops.collections.BasicSet; import org.apache.giraph.types.ops.collections.BasicSet.BasicLongOpenHashSet; +import org.apache.giraph.types.ops.collections.WritableWriter; import org.apache.hadoop.io.LongWritable; /** TypeOps implementation for working with LongWritable type */ @@ -73,8 +74,14 @@ public enum LongTypeOps @Override public <V> Basic2ObjectMap<LongWritable, V> create2ObjectOpenHashMap( - int capacity) { - return new BasicLong2ObjectOpenHashMap<>(capacity); + WritableWriter<V> valueWriter) { + return new BasicLong2ObjectOpenHashMap<>(valueWriter); + } + + @Override + public <V> Basic2ObjectMap<LongWritable, V> create2ObjectOpenHashMap( + int capacity, WritableWriter<V> valueWriter) { + return new BasicLong2ObjectOpenHashMap<>(capacity, valueWriter); } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/92a8f1ca/giraph-core/src/main/java/org/apache/giraph/types/ops/PrimitiveIdTypeOps.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/PrimitiveIdTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/PrimitiveIdTypeOps.java index 55b2737..b157885 100644 --- a/giraph-core/src/main/java/org/apache/giraph/types/ops/PrimitiveIdTypeOps.java +++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/PrimitiveIdTypeOps.java @@ -19,6 +19,7 @@ package org.apache.giraph.types.ops; import org.apache.giraph.types.ops.collections.Basic2ObjectMap; import org.apache.giraph.types.ops.collections.BasicSet; +import org.apache.giraph.types.ops.collections.WritableWriter; @@ -50,12 +51,31 @@ public interface PrimitiveIdTypeOps<T> extends PrimitiveTypeOps<T> { BasicSet<T> createOpenHashSet(int capacity); /** + * Create Basic2ObjectMap with key type T. + * Values are represented as object, even if they can be primitive. + * + * You can pass null as valueWriter, + * but readFields/write will throw an Exception, if called. + * + * @param valueWriter Writer of values + * @param <V> Type of values in the map + * @return Basic2ObjectMap + */ + <V> Basic2ObjectMap<T, V> create2ObjectOpenHashMap( + WritableWriter<V> valueWriter); + + /** * Create Basic2ObjectMap with key type T, given capacity. * Values are represented as object, even if they can be primitive. * + * You can pass null as valueWriter, + * but readFields/write will throw an Exception, if called. + * * @param capacity Capacity + * @param valueWriter Writer of values * @param <V> Type of values in the map * @return Basic2ObjectMap */ - <V> Basic2ObjectMap<T, V> create2ObjectOpenHashMap(int capacity); + <V> Basic2ObjectMap<T, V> create2ObjectOpenHashMap( + int capacity, WritableWriter<V> valueWriter); } http://git-wip-us.apache.org/repos/asf/giraph/blob/92a8f1ca/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/Basic2ObjectMap.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/Basic2ObjectMap.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/Basic2ObjectMap.java index f7ef570..299dced 100644 --- a/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/Basic2ObjectMap.java +++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/Basic2ObjectMap.java @@ -35,6 +35,7 @@ import org.apache.giraph.types.ops.LongTypeOps; import org.apache.giraph.types.ops.PrimitiveIdTypeOps; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; /** * Basic2ObjectMap with only basic set of operations. @@ -44,7 +45,7 @@ import org.apache.hadoop.io.LongWritable; * @param <K> Key type * @param <V> Value type */ -public abstract class Basic2ObjectMap<K, V> { +public abstract class Basic2ObjectMap<K, V> implements Writable { /** Removes all of the elements from this list. */ public abstract void clear(); /** @@ -99,31 +100,14 @@ public abstract class Basic2ObjectMap<K, V> { public abstract Iterator<K> fastKeyIterator(); /** - * Serializes the object, given a writer for values. - * @param out <code>DataOuput</code> to serialize object into. - * @param writer Writer of values - * @throws IOException - */ - public abstract void write(DataOutput out, WritableWriter<V> writer) - throws IOException; - /** - * Deserialize the object, given a writer for values. - * @param in <code>DataInput</code> to deseriablize object from. - * @param writer Writer of values - * @throws IOException - */ - public abstract void readFields(DataInput in, WritableWriter<V> writer) - throws IOException; - - /** * Iterator that reuses key object. * * @param <Iter> Primitive key iterator type */ protected abstract class ReusableIterator<Iter extends Iterator<?>> - implements Iterator<K> { + implements ResettableIterator<K> { /** Primitive Key iterator */ - protected final Iter iter; + protected Iter iter; /** Reusable key object */ protected final K reusableKey = getKeyTypeOps().create(); @@ -151,13 +135,29 @@ public abstract class Basic2ObjectMap<K, V> { extends Basic2ObjectMap<IntWritable, V> { /** Map */ private final Int2ObjectOpenHashMap<V> map; + /** Value writer */ + private final WritableWriter<V> valueWriter; + + /** + * Constructor + * + * @param valueWriter Writer of values + */ + public BasicInt2ObjectOpenHashMap(WritableWriter<V> valueWriter) { + this.map = new Int2ObjectOpenHashMap<>(); + this.valueWriter = valueWriter; + } /** * Constructor + * * @param capacity Capacity + * @param valueWriter Writer of values */ - public BasicInt2ObjectOpenHashMap(int capacity) { + public BasicInt2ObjectOpenHashMap( + int capacity, WritableWriter<V> valueWriter) { this.map = new Int2ObjectOpenHashMap<>(capacity); + this.valueWriter = valueWriter; } @Override @@ -203,31 +203,35 @@ public abstract class Basic2ObjectMap<K, V> { reusableKey.set(iter.nextInt()); return reusableKey; } + + @Override + public void reset() { + iter = map.keySet().iterator(); + } }; } @Override - public void write(DataOutput out, WritableWriter<V> writer) - throws IOException { + public void write(DataOutput out) throws IOException { out.writeInt(map.size()); ObjectIterator<Int2ObjectMap.Entry<V>> iterator = map.int2ObjectEntrySet().fastIterator(); while (iterator.hasNext()) { Int2ObjectMap.Entry<V> entry = iterator.next(); out.writeInt(entry.getIntKey()); - writer.write(out, entry.getValue()); + valueWriter.write(out, entry.getValue()); } } @Override - public void readFields(DataInput in, WritableWriter<V> writer) + public void readFields(DataInput in) throws IOException { int size = in.readInt(); map.clear(); map.trim(size); while (size-- > 0) { int key = in.readInt(); - V value = writer.readFields(in); + V value = valueWriter.readFields(in); map.put(key, value); } } @@ -238,13 +242,29 @@ public abstract class Basic2ObjectMap<K, V> { extends Basic2ObjectMap<LongWritable, V> { /** Map */ private final Long2ObjectOpenHashMap<V> map; + /** Value writer */ + private final WritableWriter<V> valueWriter; + + /** + * Constructor + * + * @param valueWriter Writer of values + */ + public BasicLong2ObjectOpenHashMap(WritableWriter<V> valueWriter) { + this.map = new Long2ObjectOpenHashMap<>(); + this.valueWriter = valueWriter; + } /** * Constructor + * * @param capacity Capacity + * @param valueWriter Writer of values */ - public BasicLong2ObjectOpenHashMap(int capacity) { + public BasicLong2ObjectOpenHashMap( + int capacity, WritableWriter<V> valueWriter) { this.map = new Long2ObjectOpenHashMap<>(capacity); + this.valueWriter = valueWriter; } @Override @@ -290,31 +310,34 @@ public abstract class Basic2ObjectMap<K, V> { reusableKey.set(iter.nextLong()); return reusableKey; } + + @Override + public void reset() { + iter = map.keySet().iterator(); + } }; } @Override - public void write(DataOutput out, WritableWriter<V> writer) - throws IOException { + public void write(DataOutput out) throws IOException { out.writeInt(map.size()); ObjectIterator<Long2ObjectMap.Entry<V>> iterator = map.long2ObjectEntrySet().fastIterator(); while (iterator.hasNext()) { Long2ObjectMap.Entry<V> entry = iterator.next(); out.writeLong(entry.getLongKey()); - writer.write(out, entry.getValue()); + valueWriter.write(out, entry.getValue()); } } @Override - public void readFields(DataInput in, WritableWriter<V> writer) - throws IOException { + public void readFields(DataInput in) throws IOException { int size = in.readInt(); map.clear(); map.trim(size); while (size-- > 0) { long key = in.readLong(); - V value = writer.readFields(in); + V value = valueWriter.readFields(in); map.put(key, value); } }
