Repository: giraph Updated Branches: refs/heads/trunk cc2fa8a4b -> 16dba64c3
unsafe readers for varints Summary: Varint encdoing (and hence, LongDiffNullArrayEdges) can be much faster if using UnsafeByteInput/Output. In fact, the speed of iterating over LongDiffNullArrayEdges is almost as fast as iterating over LongNullArrayEdges after the change. This difference is less than a few percent for jobs that require a lot of edge iterators, while it is significant (over 20%) without the change. JIRA: https://issues.apache.org/jira/browse/GIRAPH-1049 Test Plan: mvn clean install Reviewers: sergey.edunov, maja.kabiljo, dionysis.logothetis, ikabiljo Reviewed By: ikabiljo Differential Revision: https://reviews.facebook.net/D56169 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/16dba64c Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/16dba64c Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/16dba64c Branch: refs/heads/trunk Commit: 16dba64c31c3eb18ac916d49f1529fe4810fe754 Parents: cc2fa8a Author: spupyrev <[email protected]> Authored: Fri Apr 1 11:06:38 2016 -0700 Committer: Igor Kabiljo <[email protected]> Committed: Fri Apr 1 11:06:56 2016 -0700 ---------------------------------------------------------------------- .../ImmutableClassesGiraphConfiguration.java | 14 ++++++ .../giraph/edge/LongDiffNullArrayEdges.java | 47 +++++++++++++------- .../java/org/apache/giraph/utils/Varint.java | 18 ++++---- 3 files changed, 53 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/16dba64c/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java index 967737c..38bf101 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java @@ -1221,6 +1221,20 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable, } /** + * Create an extended data input (can be subclassed) + * + * @param buf Buffer to use for the input + * @return ExtendedDataInput object + */ + public ExtendedDataInput createExtendedDataInput(byte[] buf) { + if (useUnsafeSerialization) { + return new UnsafeByteArrayInputStream(buf); + } else { + return new ExtendedByteArrayDataInput(buf); + } + } + + /** * Create extendedDataInput based on extendedDataOutput * * @param extendedDataOutput extendedDataOutput http://git-wip-us.apache.org/repos/asf/giraph/blob/16dba64c/giraph-core/src/main/java/org/apache/giraph/edge/LongDiffNullArrayEdges.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/LongDiffNullArrayEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/LongDiffNullArrayEdges.java index e82c3a8..37b85dd 100644 --- a/giraph-core/src/main/java/org/apache/giraph/edge/LongDiffNullArrayEdges.java +++ b/giraph-core/src/main/java/org/apache/giraph/edge/LongDiffNullArrayEdges.java @@ -20,12 +20,8 @@ package org.apache.giraph.edge; import it.unimi.dsi.fastutil.bytes.ByteArrays; import it.unimi.dsi.fastutil.longs.LongArrayList; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.DataInput; -import java.io.DataInputStream; import java.io.DataOutput; -import java.io.DataOutputStream; import java.io.IOException; import java.util.Arrays; import java.util.BitSet; @@ -33,11 +29,15 @@ import java.util.Iterator; import javax.annotation.concurrent.NotThreadSafe; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.utils.EdgeIterables; +import org.apache.giraph.utils.ExtendedDataInput; +import org.apache.giraph.utils.ExtendedDataOutput; import org.apache.giraph.utils.Trimmable; import org.apache.giraph.utils.Varint; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; import com.google.common.base.Preconditions; @@ -109,7 +109,7 @@ public class LongDiffNullArrayEdges long target = targetVertexId.get(); if (size > 0) { - LongsDiffReader reader = new LongsDiffReader(compressedData); + LongsDiffReader reader = new LongsDiffReader(compressedData, getConf()); for (int i = 0; i < size; i++) { long cur = reader.readNext(); if (cur == target) { @@ -146,7 +146,7 @@ public class LongDiffNullArrayEdges /** Current position in the array. */ private int position; private final LongsDiffReader reader = - new LongsDiffReader(compressedData); + new LongsDiffReader(compressedData, getConf()); /** Representative edge object. */ private final MutableEdge<LongWritable, NullWritable> representativeEdge = @@ -205,8 +205,8 @@ public class LongDiffNullArrayEdges int pCompressed = 0; int pTransient = 0; - LongsDiffReader reader = new LongsDiffReader(compressedData); - LongsDiffWriter writer = new LongsDiffWriter(); + LongsDiffReader reader = new LongsDiffReader(compressedData, getConf()); + LongsDiffWriter writer = new LongsDiffWriter(getConf()); long curValue = size > 0 ? reader.readNext() : Long.MAX_VALUE; @@ -275,7 +275,7 @@ public class LongDiffNullArrayEdges */ private static class LongsDiffReader { /** Input stream */ - private final DataInput input; + private final ExtendedDataInput input; /** last read value */ private long current; /** True if we haven't read any numbers yet */ @@ -283,10 +283,16 @@ public class LongDiffNullArrayEdges /** * Construct LongsDiffReader + * * @param compressedData Input byte array + * @param conf Conf */ - LongsDiffReader(byte[] compressedData) { - input = new DataInputStream(new ByteArrayInputStream(compressedData)); + public LongsDiffReader( + byte[] compressedData, + ImmutableClassesGiraphConfiguration<LongWritable, Writable, NullWritable> + conf + ) { + input = conf.createExtendedDataInput(compressedData); } /** @@ -312,17 +318,26 @@ public class LongDiffNullArrayEdges * Writing array of longs diff encoded into the byte array. */ private static class LongsDiffWriter { - /** Byte array stream containing result */ - private final ByteArrayOutputStream resultStream = - new ByteArrayOutputStream(); /** Wrapping resultStream into DataOutputStream */ - private final DataOutputStream out = new DataOutputStream(resultStream); + private final ExtendedDataOutput out; /** last value written */ private long lastWritten; /** True if we haven't written any numbers yet */ private boolean first = true; /** + * Construct LongsDiffWriter + * + * @param conf Conf + */ + public LongsDiffWriter( + ImmutableClassesGiraphConfiguration<LongWritable, Writable, NullWritable> + conf + ) { + out = conf.createExtendedDataOutput(); + } + + /** * Write next value to writer * @param value Value to be written */ @@ -350,7 +365,7 @@ public class LongDiffNullArrayEdges * @return resulting byte array */ byte[] toByteArray() { - return resultStream.toByteArray(); + return out.toByteArray(); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/16dba64c/giraph-core/src/main/java/org/apache/giraph/utils/Varint.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/Varint.java b/giraph-core/src/main/java/org/apache/giraph/utils/Varint.java index 174d1f5..addcc2f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/Varint.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/Varint.java @@ -40,8 +40,6 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import com.google.common.base.Preconditions; - /** * <p> * Encodes signed and unsigned values using a common variable-length scheme, @@ -103,10 +101,10 @@ public final class Varint { long value, DataOutput out ) throws IOException { - Preconditions.checkState( - value >= 0, - "Negative value passed into writeUnsignedVarLong - " + value - ); + if (value < 0) { + throw new IllegalStateException( + "Negative value passed into writeUnsignedVarLong - " + value); + } writeVarLong(value, out); } @@ -156,10 +154,10 @@ public final class Varint { int value, DataOutput out ) throws IOException { - Preconditions.checkState( - value >= 0, - "Negative value passed into writeUnsignedVarInt - " + value - ); + if (value < 0) { + throw new IllegalStateException( + "Negative value passed into writeUnsignedVarInt - " + value); + } writeVarInt(value, out); }
