Repository: giraph Updated Branches: refs/heads/trunk fafecee71 -> 4170eeb05
unsafe byte readers/writers Summary: using unsafe readers/writers Test Plan: tested on PageRank app, and Fanout computation. In both cases, there is a ~20% speedup JIRA: https://issues.apache.org/jira/browse/GIRAPH-1049 Reviewers: sergey.edunov, maja.kabiljo, dionysis.logothetis, ikabiljo Reviewed By: ikabiljo Differential Revision: https://reviews.facebook.net/D55509 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/4170eeb0 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/4170eeb0 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/4170eeb0 Branch: refs/heads/trunk Commit: 4170eeb054ba1414eceb08ec6c0fbdbdb17eb5c2 Parents: fafecee Author: spupyrev <[email protected]> Authored: Tue Mar 15 16:47:15 2016 -0700 Committer: Igor Kabiljo <[email protected]> Committed: Tue Mar 15 16:48:42 2016 -0700 ---------------------------------------------------------------------- .../api/local/InternalMessageStore.java | 46 ++++++++++++-------- 1 file changed, 29 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/4170eeb0/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalMessageStore.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalMessageStore.java index 6c0cccb..92d9821 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalMessageStore.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalMessageStore.java @@ -34,8 +34,8 @@ import org.apache.giraph.conf.MessageClasses; import org.apache.giraph.factories.MessageValueFactory; import org.apache.giraph.types.ops.TypeOps; import org.apache.giraph.types.ops.TypeOpsUtils; -import org.apache.giraph.utils.ExtendedByteArrayDataInput; -import org.apache.giraph.utils.ExtendedByteArrayDataOutput; +import org.apache.giraph.utils.ExtendedDataInput; +import org.apache.giraph.utils.ExtendedDataOutput; import org.apache.giraph.utils.UnsafeReusableByteArrayInput; import org.apache.giraph.utils.WritableUtils; import org.apache.hadoop.io.Writable; @@ -69,7 +69,7 @@ interface InternalMessageStore abstract class InternalConcurrentMessageStore <I extends WritableComparable, M extends Writable, R> implements InternalMessageStore<I, M> { - protected final ConcurrentHashMap<I, R> received = + private final ConcurrentHashMap<I, R> received = new ConcurrentHashMap<>(); private final Class<I> idClass; @@ -102,6 +102,10 @@ interface InternalMessageStore return value; } + R removeFor(I id) { + return received.remove(id); + } + abstract R createNewReceiver(); @Override @@ -118,8 +122,9 @@ interface InternalMessageStore public static <I extends WritableComparable, M extends Writable> InternalMessageStore<I, M> createMessageStore( - final ImmutableClassesGiraphConfiguration<I, ?, ?> conf, - final MessageClasses<I, M> messageClasses) { + final ImmutableClassesGiraphConfiguration<I, ?, ?> conf, + final MessageClasses<I, M> messageClasses + ) { MessageCombiner<? super I, M> combiner = messageClasses.createMessageCombiner(conf); if (combiner != null) { @@ -132,8 +137,9 @@ interface InternalMessageStore messageClasses.createMessageValueFactory(conf)); } else { return new InternalByteMessageStore<>( - conf.getVertexIdClass(), - messageClasses.createMessageValueFactory(conf)); + conf.getVertexIdClass(), + messageClasses.createMessageValueFactory(conf), + conf); } } @@ -175,7 +181,7 @@ interface InternalMessageStore @Override public Iterable<M> takeMessages(I id) { - M message = received.remove(id); + M message = removeFor(id); if (message != null) { return Collections.singleton(message); } else { @@ -206,18 +212,22 @@ interface InternalMessageStore static class InternalByteMessageStore <I extends WritableComparable, M extends Writable> extends InternalConcurrentMessageStore<I, M, - ExtendedByteArrayDataOutput> { + ExtendedDataOutput> { private final MessageValueFactory<M> messageFactory; + private final ImmutableClassesGiraphConfiguration<I, ?, ?> conf; public InternalByteMessageStore( - Class<I> idClass, MessageValueFactory<M> messageFactory) { + Class<I> idClass, MessageValueFactory<M> messageFactory, + ImmutableClassesGiraphConfiguration<I, ?, ?> conf + ) { super(idClass); this.messageFactory = messageFactory; + this.conf = conf; } @Override public Iterable<M> takeMessages(I id) { - final ExtendedByteArrayDataOutput out = received.remove(id); + final ExtendedDataOutput out = removeFor(id); if (out == null) { return null; } @@ -225,8 +235,10 @@ interface InternalMessageStore return new Iterable<M>() { @Override public Iterator<M> iterator() { - final ExtendedByteArrayDataInput in = new ExtendedByteArrayDataInput( - out.getByteArray(), 0, out.getPos()); + final ExtendedDataInput in = conf.createExtendedDataInput( + out.getByteArray(), 0, out.getPos() + ); + final M message = messageFactory.newInstance(); return new AbstractIterator<M>() { @Override @@ -248,7 +260,7 @@ interface InternalMessageStore @Override public void sendMessage(I id, M message) { - ExtendedByteArrayDataOutput out = getReceiverFor(id); + ExtendedDataOutput out = getReceiverFor(id); synchronized (out) { try { @@ -260,8 +272,8 @@ interface InternalMessageStore } @Override - ExtendedByteArrayDataOutput createNewReceiver() { - return new ExtendedByteArrayDataOutput(); + ExtendedDataOutput createNewReceiver() { + return conf.createExtendedDataOutput(); } } @@ -285,7 +297,7 @@ interface InternalMessageStore @Override public Iterable<M> takeMessages(I id) { - final List<byte[]> out = received.remove(id); + final List<byte[]> out = removeFor(id); if (out == null) { return null; }
