Repository: giraph
Updated Branches:
  refs/heads/trunk cc2fa8a4b -> 16dba64c3


unsafe readers for varints

Summary:
Varint encdoing (and hence, LongDiffNullArrayEdges) can be much faster if using 
UnsafeByteInput/Output. In fact, the speed of iterating over 
LongDiffNullArrayEdges is almost as fast as iterating over LongNullArrayEdges 
after the change. This difference is less than a few percent for jobs that 
require a lot of edge iterators, while it is significant (over 20%) without the 
change.

JIRA: https://issues.apache.org/jira/browse/GIRAPH-1049

Test Plan: mvn clean install

Reviewers: sergey.edunov, maja.kabiljo, dionysis.logothetis, ikabiljo

Reviewed By: ikabiljo

Differential Revision: https://reviews.facebook.net/D56169


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/16dba64c
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/16dba64c
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/16dba64c

Branch: refs/heads/trunk
Commit: 16dba64c31c3eb18ac916d49f1529fe4810fe754
Parents: cc2fa8a
Author: spupyrev <[email protected]>
Authored: Fri Apr 1 11:06:38 2016 -0700
Committer: Igor Kabiljo <[email protected]>
Committed: Fri Apr 1 11:06:56 2016 -0700

----------------------------------------------------------------------
 .../ImmutableClassesGiraphConfiguration.java    | 14 ++++++
 .../giraph/edge/LongDiffNullArrayEdges.java     | 47 +++++++++++++-------
 .../java/org/apache/giraph/utils/Varint.java    | 18 ++++----
 3 files changed, 53 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/16dba64c/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
 
b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index 967737c..38bf101 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -1221,6 +1221,20 @@ public class ImmutableClassesGiraphConfiguration<I 
extends WritableComparable,
   }
 
   /**
+   * Create an extended data input (can be subclassed)
+   *
+   * @param buf Buffer to use for the input
+   * @return ExtendedDataInput object
+   */
+  public ExtendedDataInput createExtendedDataInput(byte[] buf) {
+    if (useUnsafeSerialization) {
+      return new UnsafeByteArrayInputStream(buf);
+    } else {
+      return new ExtendedByteArrayDataInput(buf);
+    }
+  }
+
+  /**
    * Create extendedDataInput based on extendedDataOutput
    *
    * @param extendedDataOutput extendedDataOutput

http://git-wip-us.apache.org/repos/asf/giraph/blob/16dba64c/giraph-core/src/main/java/org/apache/giraph/edge/LongDiffNullArrayEdges.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/edge/LongDiffNullArrayEdges.java 
b/giraph-core/src/main/java/org/apache/giraph/edge/LongDiffNullArrayEdges.java
index e82c3a8..37b85dd 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/edge/LongDiffNullArrayEdges.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/edge/LongDiffNullArrayEdges.java
@@ -20,12 +20,8 @@ package org.apache.giraph.edge;
 import it.unimi.dsi.fastutil.bytes.ByteArrays;
 import it.unimi.dsi.fastutil.longs.LongArrayList;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.DataOutput;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.BitSet;
@@ -33,11 +29,15 @@ import java.util.Iterator;
 
 import javax.annotation.concurrent.NotThreadSafe;
 
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.utils.EdgeIterables;
+import org.apache.giraph.utils.ExtendedDataInput;
+import org.apache.giraph.utils.ExtendedDataOutput;
 import org.apache.giraph.utils.Trimmable;
 import org.apache.giraph.utils.Varint;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
 
 import com.google.common.base.Preconditions;
 
@@ -109,7 +109,7 @@ public class LongDiffNullArrayEdges
     long target = targetVertexId.get();
 
     if (size > 0) {
-      LongsDiffReader reader = new LongsDiffReader(compressedData);
+      LongsDiffReader reader = new LongsDiffReader(compressedData, getConf());
       for (int i = 0; i < size; i++) {
         long cur = reader.readNext();
         if (cur == target) {
@@ -146,7 +146,7 @@ public class LongDiffNullArrayEdges
       /** Current position in the array. */
       private int position;
       private final LongsDiffReader reader =
-          new LongsDiffReader(compressedData);
+          new LongsDiffReader(compressedData, getConf());
 
       /** Representative edge object. */
       private final MutableEdge<LongWritable, NullWritable> representativeEdge 
=
@@ -205,8 +205,8 @@ public class LongDiffNullArrayEdges
     int pCompressed = 0;
     int pTransient = 0;
 
-    LongsDiffReader reader = new LongsDiffReader(compressedData);
-    LongsDiffWriter writer = new LongsDiffWriter();
+    LongsDiffReader reader = new LongsDiffReader(compressedData, getConf());
+    LongsDiffWriter writer = new LongsDiffWriter(getConf());
 
     long curValue = size > 0 ? reader.readNext() : Long.MAX_VALUE;
 
@@ -275,7 +275,7 @@ public class LongDiffNullArrayEdges
    */
   private static class LongsDiffReader {
     /** Input stream */
-    private final DataInput input;
+    private final ExtendedDataInput input;
     /** last read value */
     private long current;
     /** True if we haven't read any numbers yet */
@@ -283,10 +283,16 @@ public class LongDiffNullArrayEdges
 
     /**
      * Construct LongsDiffReader
+     *
      * @param compressedData Input byte array
+     * @param conf Conf
      */
-    LongsDiffReader(byte[] compressedData) {
-      input = new DataInputStream(new ByteArrayInputStream(compressedData));
+    public LongsDiffReader(
+      byte[] compressedData,
+      ImmutableClassesGiraphConfiguration<LongWritable, Writable, NullWritable>
+        conf
+    ) {
+      input = conf.createExtendedDataInput(compressedData);
     }
 
     /**
@@ -312,17 +318,26 @@ public class LongDiffNullArrayEdges
    * Writing array of longs diff encoded into the byte array.
    */
   private static class LongsDiffWriter {
-    /** Byte array stream containing result */
-    private final ByteArrayOutputStream resultStream =
-        new ByteArrayOutputStream();
     /** Wrapping resultStream into DataOutputStream */
-    private final DataOutputStream out = new DataOutputStream(resultStream);
+    private final ExtendedDataOutput out;
     /** last value written */
     private long lastWritten;
     /** True if we haven't written any numbers yet */
     private boolean first = true;
 
     /**
+     * Construct LongsDiffWriter
+     *
+     * @param conf Conf
+     */
+    public LongsDiffWriter(
+      ImmutableClassesGiraphConfiguration<LongWritable, Writable, NullWritable>
+        conf
+    ) {
+      out = conf.createExtendedDataOutput();
+    }
+
+    /**
      * Write next value to writer
      * @param value Value to be written
      */
@@ -350,7 +365,7 @@ public class LongDiffNullArrayEdges
      * @return resulting byte array
      */
     byte[] toByteArray() {
-      return resultStream.toByteArray();
+      return out.toByteArray();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/16dba64c/giraph-core/src/main/java/org/apache/giraph/utils/Varint.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/Varint.java 
b/giraph-core/src/main/java/org/apache/giraph/utils/Varint.java
index 174d1f5..addcc2f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/Varint.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/Varint.java
@@ -40,8 +40,6 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import com.google.common.base.Preconditions;
-
 /**
  * <p>
  * Encodes signed and unsigned values using a common variable-length scheme,
@@ -103,10 +101,10 @@ public final class Varint {
     long value,
     DataOutput out
   ) throws IOException {
-    Preconditions.checkState(
-      value >= 0,
-      "Negative value passed into writeUnsignedVarLong - " + value
-    );
+    if (value < 0) {
+      throw new IllegalStateException(
+        "Negative value passed into writeUnsignedVarLong - " + value);
+    }
     writeVarLong(value, out);
   }
 
@@ -156,10 +154,10 @@ public final class Varint {
     int value,
     DataOutput out
   ) throws IOException {
-    Preconditions.checkState(
-      value >= 0,
-      "Negative value passed into writeUnsignedVarInt - " + value
-    );
+    if (value < 0) {
+      throw new IllegalStateException(
+        "Negative value passed into writeUnsignedVarInt - " + value);
+    }
     writeVarInt(value, out);
   }
 

Reply via email to