http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayEdges.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayEdges.java 
b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayEdges.java
new file mode 100644
index 0000000..1d8fc26
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayEdges.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.graph.MutableEdge;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * A list of edges backed by a byte-array.
+ * The same Edge object is reused when iterating over all edges,
+ * unless an alternative iterable is requested.
+ * It automatically optimizes for edges with no value,
+ * and also supports shallow copy from another instance.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ */
+public class ByteArrayEdges<I extends WritableComparable, E extends Writable>
+    implements Iterable<Edge<I, E>>, Writable {
+  /** Representative edge object. */
+  private MutableEdge<I, E> representativeEdge;
+  /** Serialized edges. */
+  private byte[] serializedEdges;
+  /** Number of bytes used in serializedEdges. */
+  private int serializedEdgesBytesUsed;
+  /** Number of edges. */
+  private int edgeCount;
+  /** Configuration. */
+  private ImmutableClassesGiraphConfiguration<I, ?, E, ?> configuration;
+
+  /**
+   * Constructor.
+   * Depending on the configuration, instantiates a representative edge with
+   * or without an edge value.
+   *
+   * @param conf Configuration
+   */
+  public ByteArrayEdges(ImmutableClassesGiraphConfiguration<I, ?, E, ?> conf) {
+    configuration = conf;
+    representativeEdge = configuration.createMutableEdge();
+    ExtendedDataOutput extendedOutputStream =
+        configuration.createExtendedDataOutput();
+    serializedEdges = extendedOutputStream.getByteArray();
+    serializedEdgesBytesUsed = extendedOutputStream.getPos();
+  }
+
+  /**
+   * Constructor.
+   * Takes another instance of {@link ByteArrayEdges} and makes a shallow
+   * copy of it.
+   *
+   * @param edges {@link ByteArrayEdges} to copy
+   */
+  public ByteArrayEdges(ByteArrayEdges<I, E> edges) {
+    representativeEdge = edges.representativeEdge;
+    serializedEdges = edges.serializedEdges;
+    serializedEdgesBytesUsed = edges.serializedEdgesBytesUsed;
+    edgeCount = edges.edgeCount;
+    configuration = edges.configuration;
+  }
+
+  /**
+   * Append an edge to the serialized representation.
+   *
+   * @param edge Edge to append
+   */
+  public final void appendEdge(Edge<I, E> edge) {
+    ExtendedDataOutput extendedDataOutput =
+        configuration.createExtendedDataOutput(
+            serializedEdges, serializedEdgesBytesUsed);
+    try {
+      WritableUtils.writeEdge(extendedDataOutput, edge);
+    } catch (IOException e) {
+      throw new IllegalStateException("append: Failed to write to the new " +
+          "byte array");
+    }
+    serializedEdges = extendedDataOutput.getByteArray();
+    serializedEdgesBytesUsed = extendedDataOutput.getPos();
+    ++edgeCount;
+  }
+
+  /**
+   * Set all the edges.
+   * Note: when possible, use the constructor which takes another {@link
+   * ByteArrayEdges} instead of a generic {@link Iterable}.
+   *
+   * @param edges Iterable of edges
+   */
+  public final void setEdges(Iterable<Edge<I, E>> edges) {
+    ExtendedDataOutput extendedOutputStream =
+        configuration.createExtendedDataOutput();
+    if (edges != null) {
+      for (Edge<I, E> edge : edges) {
+        try {
+          WritableUtils.writeEdge(extendedOutputStream, edge);
+        } catch (IOException e) {
+          throw new IllegalStateException("setEdges: Failed to serialize " +
+              edge);
+        }
+        ++edgeCount;
+      }
+    }
+    serializedEdges = extendedOutputStream.getByteArray();
+    serializedEdgesBytesUsed = extendedOutputStream.getPos();
+  }
+
+  /**
+   * Remove the first edge pointing to a target vertex.
+   *
+   * @param targetVertexId Target vertex id
+   * @return True if one such edge was found and removed.
+   */
+  public final boolean removeFirstEdge(I targetVertexId) {
+    // Note that this is very expensive (deserializes all edges).
+    ByteArrayEdgeIterator iterator = new ByteArrayEdgeIterator();
+    int foundStartOffset = 0;
+    while (iterator.hasNext()) {
+      Edge<I, E> edge = iterator.next();
+      if (edge.getTargetVertexId().equals(targetVertexId)) {
+        System.arraycopy(serializedEdges, iterator.extendedDataInput.getPos(),
+            serializedEdges, foundStartOffset,
+            serializedEdgesBytesUsed - iterator.extendedDataInput.getPos());
+        serializedEdgesBytesUsed -=
+            iterator.extendedDataInput.getPos() - foundStartOffset;
+        --edgeCount;
+        return true;
+      }
+      foundStartOffset = iterator.extendedDataInput.getPos();
+    }
+
+    return false;
+  }
+
+  /**
+   * Remove all edges pointing to a target vertex.
+   *
+   * @param targetVertexId Target vertex id
+   * @return The number of removed edges
+   */
+  public final int removeAllEdges(I targetVertexId) {
+    // Note that this is very expensive (deserializes all edges).
+    ByteArrayEdgeIterator iterator = new ByteArrayEdgeIterator();
+    int removedCount = 0;
+    List<Integer> foundStartOffsets = new LinkedList<Integer>();
+    List<Integer> foundEndOffsets = new LinkedList<Integer>();
+    int lastStartOffset = 0;
+    while (iterator.hasNext()) {
+      Edge<I, E> edge = iterator.next();
+      if (edge.getTargetVertexId().equals(targetVertexId)) {
+        foundStartOffsets.add(lastStartOffset);
+        foundEndOffsets.add(iterator.extendedDataInput.getPos());
+        ++removedCount;
+      }
+      lastStartOffset = iterator.extendedDataInput.getPos();
+    }
+    foundStartOffsets.add(serializedEdgesBytesUsed);
+
+    Iterator<Integer> foundStartOffsetIter = foundStartOffsets.iterator();
+    Integer foundStartOffset = foundStartOffsetIter.next();
+    for (Integer foundEndOffset : foundEndOffsets) {
+      Integer nextFoundStartOffset = foundStartOffsetIter.next();
+      System.arraycopy(serializedEdges, foundEndOffset,
+          serializedEdges, foundStartOffset,
+          nextFoundStartOffset - foundEndOffset);
+      serializedEdgesBytesUsed -= foundEndOffset - foundStartOffset;
+      foundStartOffset = nextFoundStartOffset;
+    }
+
+    edgeCount -= removedCount;
+    return removedCount;
+  }
+
+  public final int getNumEdges() {
+    return edgeCount;
+  }
+
+  /**
+   * Iterator that uses the representative edge (only one iterator allowed
+   * at a time).
+   */
+  private final class ByteArrayEdgeIterator implements Iterator<Edge<I, E>> {
+    /** Input for processing the bytes */
+    private final ExtendedDataInput extendedDataInput;
+
+    /** Constructor. */
+    ByteArrayEdgeIterator() {
+      extendedDataInput = configuration.createExtendedDataInput(
+          serializedEdges, 0, serializedEdgesBytesUsed);
+    }
+
+    @Override
+    public boolean hasNext() {
+      return serializedEdges != null && extendedDataInput.available() > 0;
+    }
+
+    @Override
+    public Edge<I, E> next() {
+      try {
+        WritableUtils.readEdge(extendedDataInput, representativeEdge);
+      } catch (IOException e) {
+        throw new IllegalStateException("next: Failed on pos " +
+            extendedDataInput.getPos() + " edge " + representativeEdge);
+      }
+      return representativeEdge;
+    }
+
+    @Override
+    public void remove() {
+      throw new IllegalAccessError("remove: Not supported");
+    }
+  }
+
+  @Override
+  public final Iterator<Edge<I, E>> iterator() {
+    return new ByteArrayEdgeIterator();
+  }
+
+  /**
+   * Release and return the current representative edge.
+   *
+   * @return The released edge
+   */
+  private Edge<I, E> releaseCurrentEdge() {
+    Edge<I, E> releasedEdge = representativeEdge;
+    representativeEdge = configuration.createMutableEdge();
+    return releasedEdge;
+  }
+
+  /**
+   * Get an iterable wrapper that creates new Edge objects on the fly.
+   *
+   * @return Edge iteratable that creates new objects
+   */
+  public final Iterable<Edge<I, E>> copyEdgeIterable() {
+    return Iterables.transform(this,
+        new Function<Edge<I, E>, Edge<I, E>>() {
+          @Override
+          public Edge<I, E> apply(Edge<I, E> input) {
+            return releaseCurrentEdge();
+          }
+        });
+  }
+
+  @Override
+  public final void readFields(DataInput in) throws IOException {
+    serializedEdgesBytesUsed = in.readInt();
+    // Only create a new buffer if the old one isn't big enough
+    if (serializedEdges == null ||
+        serializedEdgesBytesUsed > serializedEdges.length) {
+      serializedEdges = new byte[serializedEdgesBytesUsed];
+    }
+    in.readFully(serializedEdges, 0, serializedEdgesBytesUsed);
+    edgeCount = in.readInt();
+  }
+
+  @Override
+  public final void write(DataOutput out) throws IOException {
+    out.writeInt(serializedEdgesBytesUsed);
+    out.write(serializedEdges, 0, serializedEdgesBytesUsed);
+    out.writeInt(edgeCount);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java 
b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java
new file mode 100644
index 0000000..bd464d5
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdData.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Stores pairs of vertex id and generic data in a single byte array
+ *
+ * @param <I> Vertex id
+ * @param <T> Data
+ */
+public abstract class ByteArrayVertexIdData<I extends WritableComparable, T>
+  implements Writable, ImmutableClassesGiraphConfigurable {
+  /** Extended data output */
+  private ExtendedDataOutput extendedDataOutput;
+  /** Configuration */
+  private ImmutableClassesGiraphConfiguration<I, ?, ?, ?> configuration;
+
+  /**
+   * Create a new data object.
+   *
+   * @return Newly-created data object.
+   */
+  public abstract T createData();
+
+  /**
+   * Write a data object to an {@link ExtendedDataOutput}.
+   *
+   * @param out {@link ExtendedDataOutput}
+   * @param data Data object to write
+   * @throws IOException
+   */
+  public abstract void writeData(ExtendedDataOutput out, T data)
+    throws IOException;
+
+  /**
+   * Read a data object's fields from an {@link ExtendedDataInput}.
+   *
+   * @param in {@link ExtendedDataInput}
+   * @param data Data object to fill in-place
+   * @throws IOException
+   */
+  public abstract void readData(ExtendedDataInput in, T data)
+    throws IOException;
+
+  /**
+   * Initialize the inner state. Must be called before {@code add()} is
+   * called.
+   */
+  public void initialize() {
+    extendedDataOutput = configuration.createExtendedDataOutput();
+  }
+
+  /**
+   * Initialize the inner state, with a known size. Must be called before
+   * {@code add()} is called.
+   *
+   * @param expectedSize Number of bytes to be expected
+   */
+  public void initialize(int expectedSize) {
+    extendedDataOutput = configuration.createExtendedDataOutput(expectedSize);
+  }
+
+  /**
+   * Add a vertex id and data pair to the collection.
+   *
+   * @param vertexId Vertex id
+   * @param data Data
+   */
+  public void add(I vertexId, T data) {
+    try {
+      vertexId.write(extendedDataOutput);
+      writeData(extendedDataOutput, data);
+    } catch (IOException e) {
+      throw new IllegalStateException("add: IOException", e);
+    }
+  }
+
+  /**
+   * Get the number of bytes used.
+   *
+   * @return Bytes used
+   */
+  public int getSize() {
+    return extendedDataOutput.getPos();
+  }
+
+  /**
+   * Get the size of this object in serialized form.
+   *
+   * @return The size (in bytes) of the serialized object
+   */
+  public int getSerializedSize() {
+    return 1 + 4 + getSize();
+  }
+
+  /**
+   * Check if the list is empty.
+   *
+   * @return Whether the list is empty
+   */
+  public boolean isEmpty() {
+    return extendedDataOutput.getPos() == 0;
+  }
+
+  /**
+   * Clear the list.
+   */
+  public void clear() {
+    extendedDataOutput.reset();
+  }
+
+  /**
+   * Get the underlying byte-array.
+   *
+   * @return The underlying byte-array
+   */
+  public byte[] getByteArray() {
+    return extendedDataOutput.getByteArray();
+  }
+
+  @Override
+  public void setConf(ImmutableClassesGiraphConfiguration configuration) {
+    this.configuration = configuration;
+  }
+
+  @Override
+  public ImmutableClassesGiraphConfiguration getConf() {
+    return configuration;
+  }
+
+  @Override
+  public void write(DataOutput dataOutput) throws IOException {
+    dataOutput.writeInt(extendedDataOutput.getPos());
+    dataOutput.write(extendedDataOutput.getByteArray(), 0,
+        extendedDataOutput.getPos());
+  }
+
+  @Override
+  public void readFields(DataInput dataInput) throws IOException {
+    int size = dataInput.readInt();
+    byte[] buf = new byte[size];
+    dataInput.readFully(buf);
+    extendedDataOutput = configuration.createExtendedDataOutput(buf, size);
+  }
+
+  /**
+   * Get an iterator over the pairs.
+   *
+   * @return Iterator
+   */
+  public VertexIdDataIterator getVertexIdDataIterator() {
+    return new VertexIdDataIterator();
+  }
+
+  /**
+   * Special iterator that reuses vertex ids and data objects so that the
+   * lifetime of the object is only until next() is called.
+   *
+   * Vertex id ownership can be released if desired through
+   * releaseCurrentVertexId().  This optimization allows us to cut down
+   * on the number of objects instantiated and garbage collected.
+   *
+   * Not thread-safe.
+   */
+  public class VertexIdDataIterator extends VertexIdIterator<I> {
+    /** Current data. */
+    private T data;
+
+    /** Default constructor. */
+    public VertexIdDataIterator() {
+      super(extendedDataOutput, configuration);
+    }
+
+    @Override
+    public void next() {
+      if (vertexId == null) {
+        vertexId = configuration.createVertexId();
+      }
+      if (data == null) {
+        data = createData();
+      }
+      try {
+        vertexId.readFields(extendedDataInput);
+        readData(extendedDataInput, data);
+      } catch (IOException e) {
+        throw new IllegalStateException("next: IOException", e);
+      }
+    }
+
+    /**
+     * Get the current data.
+     *
+     * @return Current data
+     */
+    public T getCurrentData() {
+      return data;
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java 
b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
new file mode 100644
index 0000000..1cfd21e
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdEdges.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.Edge;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+
+/**
+ * Stores vertex id and edge pairs in a single byte array.
+ *
+ * @param <I> Vertex id
+ * @param <E> Edge value
+ */
+@SuppressWarnings("unchecked")
+public class ByteArrayVertexIdEdges<I extends WritableComparable,
+    E extends Writable> extends ByteArrayVertexIdData<I, Edge<I, E>> {
+  /**
+   * Cast the {@link ImmutableClassesGiraphConfiguration} so it can be used
+   * to generate edge objects.
+   *
+   * @return Casted configuration
+   */
+  @Override
+  public ImmutableClassesGiraphConfiguration<I, ?, E, ?> getConf() {
+    return (ImmutableClassesGiraphConfiguration<I, ?, E, ?>) super.getConf();
+  }
+
+  @Override
+  public Edge<I, E> createData() {
+    return getConf().createMutableEdge();
+  }
+
+  @Override
+  public void writeData(ExtendedDataOutput out, Edge<I, E> edge)
+    throws IOException {
+    WritableUtils.writeEdge(out, edge);
+  }
+
+  @Override
+  public void readData(ExtendedDataInput in, Edge<I, E> edge)
+    throws IOException {
+    WritableUtils.readEdge(in, edge);
+  }
+
+  /**
+   * Get an iterator over the pairs.
+   *
+   * @return Iterator
+   */
+  public VertexIdEdgeIterator getVertexIdEdgeIterator() {
+    return new VertexIdEdgeIterator();
+  }
+
+  /**
+   * Special iterator that reuses vertex ids and edge objects so that the
+   * lifetime of the object is only until next() is called.
+   */
+  public class VertexIdEdgeIterator extends VertexIdDataIterator {
+    public Edge<I, E> getCurrentEdge() {
+      return getCurrentData();
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
 
b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
index dea4229..fd06783 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
@@ -17,115 +17,75 @@
  */
 package org.apache.giraph.utils;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
 /**
- * Stores vertex id and message pairs in a single byte array
+ * Stores vertex id and message pairs in a single byte array.
  *
  * @param <I> Vertex id
  * @param <M> Message data
  */
+@SuppressWarnings("unchecked")
 public class ByteArrayVertexIdMessages<I extends WritableComparable,
-    M extends Writable> implements Writable,
-    ImmutableClassesGiraphConfigurable {
-  /** Extended data output */
-  private ExtendedDataOutput extendedDataOutput;
-  /** Configuration */
-  private ImmutableClassesGiraphConfiguration<I, ?, ?, M> configuration;
+    M extends Writable> extends ByteArrayVertexIdData<I, M> {
   /** Add the message size to the stream? (Depends on the message store) */
   private boolean useMessageSizeEncoding = false;
 
   /**
-   * Constructor for reflection
-   */
-  public ByteArrayVertexIdMessages() { }
-
-  /**
    * Set whether message sizes should be encoded.  This should only be a
    * possibility when not combining.  When combining, all messages need to be
    * deserializd right away, so this won't help.
    */
   private void setUseMessageSizeEncoding() {
-    if (!configuration.useCombiner()) {
-      useMessageSizeEncoding = configuration.useMessageSizeEncoding();
+    if (!getConf().useCombiner()) {
+      useMessageSizeEncoding = getConf().useMessageSizeEncoding();
     } else {
       useMessageSizeEncoding = false;
     }
   }
 
   /**
-   * Initialize the inner state. Must be called before {@code add()} is
-   * called.
+   * Cast the {@link ImmutableClassesGiraphConfiguration} so it can be used
+   * to generate message objects.
+   *
+   * @return Casted configuration
    */
-  public void initialize() {
-    extendedDataOutput = configuration.createExtendedDataOutput();
-    setUseMessageSizeEncoding();
+  @Override
+  public ImmutableClassesGiraphConfiguration<I, ?, ?, M> getConf() {
+    return (ImmutableClassesGiraphConfiguration<I, ?, ?, M>) super.getConf();
   }
 
-  /**
-   * Initialize the inner state, with a known size. Must be called before
-   * {@code add()} is called.
-   *
-   * @param expectedSize Number of bytes to be expected
-   */
-  public void initialize(int expectedSize) {
-    extendedDataOutput = configuration.createExtendedDataOutput(expectedSize);
-    setUseMessageSizeEncoding();
+  @Override
+  public M createData() {
+    return getConf().createMessageValue();
   }
 
-  /**
-   * Add a vertex id and message pair to the collection.
-   *
-   * @param vertexId Vertex id
-   * @param message Message
-   */
-  public void add(I vertexId, M message) {
-    try {
-      vertexId.write(extendedDataOutput);
-      // Write the size if configured this way, else, just the message
-      if (useMessageSizeEncoding) {
-        int pos = extendedDataOutput.getPos();
-        extendedDataOutput.skipBytes(4);
-        message.write(extendedDataOutput);
-        extendedDataOutput.writeInt(
-            pos, extendedDataOutput.getPos() - pos - 4);
-      } else {
-        message.write(extendedDataOutput);
-      }
-    } catch (IOException e) {
-      throw new IllegalStateException("add: IOException", e);
-    }
+  @Override
+  public void writeData(ExtendedDataOutput out, M message) throws IOException {
+    message.write(out);
   }
 
-  /**
-   * Get the number of bytes used
-   *
-   * @return Number of bytes used
-   */
-  public int getSize() {
-    return extendedDataOutput.getPos();
+  @Override
+  public void readData(ExtendedDataInput in, M message) throws IOException {
+    message.readFields(in);
   }
 
-  /**
-   * Check if the list is empty.
-   *
-   * @return True iff there are no pairs in the list
-   */
-  public boolean isEmpty() {
-    return extendedDataOutput.getPos() == 0;
+  @Override
+  public void initialize() {
+    super.initialize();
+    setUseMessageSizeEncoding();
   }
 
-  /**
-   * Clear the collection.
-   */
-  public void clear() {
-    extendedDataOutput.reset();
+  @Override
+  public void initialize(int expectedSize) {
+    super.initialize(expectedSize);
+    setUseMessageSizeEncoding();
   }
 
   /**
@@ -140,6 +100,16 @@ public class ByteArrayVertexIdMessages<I extends 
WritableComparable,
   }
 
   /**
+   * Special iterator that reuses vertex ids and message objects so that the
+   * lifetime of the object is only until next() is called.
+   */
+  public class VertexIdMessageIterator extends VertexIdDataIterator {
+    public M getCurrentMessage() {
+      return getCurrentData();
+    }
+  }
+
+  /**
    * Get specialized iterator that will instiantiate the vertex id and
    * message of this object.  It will only produce message bytes, not actual
    * messages and expects a different encoding.
@@ -155,128 +125,15 @@ public class ByteArrayVertexIdMessages<I extends 
WritableComparable,
   }
 
   @Override
-  public void setConf(ImmutableClassesGiraphConfiguration configuration) {
-    this.configuration = configuration;
-  }
-
-  @Override
-  public ImmutableClassesGiraphConfiguration getConf() {
-    return configuration;
-  }
-
-  @Override
   public void write(DataOutput dataOutput) throws IOException {
     dataOutput.writeBoolean(useMessageSizeEncoding);
-    dataOutput.writeInt(extendedDataOutput.getPos());
-    dataOutput.write(extendedDataOutput.getByteArray(), 0,
-        extendedDataOutput.getPos());
+    super.write(dataOutput);
   }
 
   @Override
   public void readFields(DataInput dataInput) throws IOException {
     useMessageSizeEncoding = dataInput.readBoolean();
-    int size = dataInput.readInt();
-    byte[] buf = new byte[size];
-    dataInput.readFully(buf);
-    extendedDataOutput = configuration.createExtendedDataOutput(buf, size);
-  }
-
-  /**
-   * Get the size of this object in serialized form.
-   *
-   * @return The size (in bytes) of serialized object
-   */
-  public int getSerializedSize() {
-    return 1 + 4 + getSize();
-  }
-
-  /**
-   * Common implementation for VertexIdMessageIterator
-   * and VertexIdMessageBytesIterator
-   */
-  public abstract class VertexIdIterator {
-    /** Reader of the serialized messages */
-    protected final ExtendedDataInput extendedDataInput =
-        configuration.createExtendedDataInput(
-            extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
-    /** Current vertex id */
-    protected I vertexId;
-
-    /**
-     * Returns true if the iteration has more elements.
-     *
-     * @return True if the iteration has more elements.
-     */
-    public boolean hasNext() {
-      return extendedDataInput.available() > 0;
-    }
-    /**
-     * Moves to the next element in the iteration.
-     */
-    public abstract void next();
-
-    /**
-     * Get the current vertex id.  Ihis object's contents are only guaranteed
-     * until next() is called.  To take ownership of this object call
-     * releaseCurrentVertexId() after getting a reference to this object.
-     *
-     * @return Current vertex id
-     */
-    public I getCurrentVertexId() {
-      return vertexId;
-    }
-    /**
-     * The backing store of the current vertex id is now released.
-     * Further calls to getCurrentVertexId () without calling next()
-     * will return null.
-     *
-     * @return Current vertex id that was released
-     */
-    public I releaseCurrentVertexId() {
-      I releasedVertexId = vertexId;
-      vertexId = null;
-      return releasedVertexId;
-    }
-  }
-
-  /**
-   * Special iterator that reuses vertex ids and messages so that the
-   * lifetime of the object is only until next() is called.
-   *
-   * Vertex id ownership can be released if desired through
-   * releaseCurrentVertexId().  This optimization allows us to cut down
-   * on the number of objects instantiated and garbage collected.
-   *
-   * Not thread-safe.
-   */
-  public class VertexIdMessageIterator extends VertexIdIterator {
-    /** Current message */
-    private M message;
-
-    @Override
-    public void next() {
-      if (vertexId == null) {
-        vertexId = configuration.createVertexId();
-      }
-      if (message == null) {
-        message = configuration.createMessageValue();
-      }
-      try {
-        vertexId.readFields(extendedDataInput);
-        message.readFields(extendedDataInput);
-      } catch (IOException e) {
-        throw new IllegalStateException("next: IOException", e);
-      }
-    }
-
-    /**
-     * Get the current message
-     *
-     * @return Current message
-     */
-    public M getCurrentMessage() {
-      return message;
-    }
+    super.readFields(dataInput);
   }
 
   /**
@@ -290,7 +147,7 @@ public class ByteArrayVertexIdMessages<I extends 
WritableComparable,
    *
    * Not thread-safe.
    */
-  public class VertexIdMessageBytesIterator extends VertexIdIterator {
+  public class VertexIdMessageBytesIterator extends VertexIdDataIterator {
     /** Last message offset */
     private int messageOffset = -1;
     /** Number of bytes in the last message */
@@ -302,7 +159,7 @@ public class ByteArrayVertexIdMessages<I extends 
WritableComparable,
     @Override
     public void next() {
       if (vertexId == null) {
-        vertexId = configuration.createVertexId();
+        vertexId = getConf().createVertexId();
       }
 
       try {
@@ -326,8 +183,7 @@ public class ByteArrayVertexIdMessages<I extends 
WritableComparable,
     public void writeCurrentMessageBytes(
         ExtendedDataOutput dataOutput) {
       try {
-        dataOutput.write(
-            extendedDataOutput.getByteArray(), messageOffset, messageBytes);
+        dataOutput.write(getByteArray(), messageOffset, messageBytes);
       } catch (IOException e) {
         throw new IllegalStateException("writeCurrentMessageBytes: Got " +
             "IOException", e);

http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdIterator.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdIterator.java 
b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdIterator.java
new file mode 100644
index 0000000..0c9ee07
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdIterator.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Common implementation for VertexIdEdgeIterator, VertexIdMessageIterator
+ * and VertexIdMessageBytesIterator.
+ *
+ * @param <I> Vertex id
+ */
+public abstract class VertexIdIterator<I extends WritableComparable> {
+  /** Reader of the serialized edges */
+  protected final ExtendedDataInput extendedDataInput;
+
+  /** Current vertex id */
+  protected I vertexId;
+
+  /**
+   * Constructor.
+   *
+   * @param extendedDataOutput Extended data output
+   * @param configuration Configuration
+   */
+  public VertexIdIterator(
+      ExtendedDataOutput extendedDataOutput,
+      ImmutableClassesGiraphConfiguration<I, ?, ?, ?> configuration) {
+    extendedDataInput = configuration.createExtendedDataInput(
+        extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
+  }
+
+  /**
+   * Returns true if the iteration has more elements.
+   *
+   * @return True if the iteration has more elements.
+   */
+  public boolean hasNext() {
+    return extendedDataInput.available() > 0;
+  }
+  /**
+   * Moves to the next element in the iteration.
+   */
+  public abstract void next();
+
+  /**
+   * Get the current vertex id.  Ihis object's contents are only guaranteed
+   * until next() is called.  To take ownership of this object call
+   * releaseCurrentVertexId() after getting a reference to this object.
+   *
+   * @return Current vertex id
+   */
+  public I getCurrentVertexId() {
+    return vertexId;
+  }
+  /**
+   * The backing store of the current vertex id is now released.
+   * Further calls to getCurrentVertexId () without calling next()
+   * will return null.
+   *
+   * @return Current vertex id that was released
+   */
+  public I releaseCurrentVertexId() {
+    I releasedVertexId = vertexId;
+    vertexId = null;
+    return releasedVertexId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java 
b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
index fefe9a0..ef94645 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
@@ -18,10 +18,12 @@
 
 package org.apache.giraph.utils;
 
+import org.apache.giraph.graph.Edge;
 import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.giraph.zk.ZooKeeperExt.PathStat;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -30,6 +32,7 @@ import org.apache.zookeeper.data.Stat;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
@@ -323,4 +326,36 @@ public class WritableUtils {
           e);
     }
   }
+
+  /**
+   * Write an edge to an output stream.
+   *
+   * @param out Data output
+   * @param edge Edge to write
+   * @param <I> Vertex id
+   * @param <E> Edge value
+   * @throws IOException
+   */
+  public static <I extends WritableComparable, E extends Writable>
+  void writeEdge(DataOutput out, Edge<I, E> edge)
+    throws IOException {
+    edge.getTargetVertexId().write(out);
+    edge.getValue().write(out);
+  }
+
+  /**
+   * Read an edge from an input stream.
+   *
+   * @param in Data input
+   * @param edge Edge to fill in-place
+   * @param <I> Vertex id
+   * @param <E> Edge value
+   * @throws IOException
+   */
+  public static <I extends WritableComparable, E extends Writable>
+  void readEdge(DataInput in, Edge<I, E> edge)
+    throws IOException {
+    edge.getTargetVertexId().readFields(in);
+    edge.getValue().readFields(in);
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/vertex/ByteArrayVertex.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/vertex/ByteArrayVertex.java 
b/giraph-core/src/main/java/org/apache/giraph/vertex/ByteArrayVertex.java
new file mode 100644
index 0000000..1e56b20
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/vertex/ByteArrayVertex.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.vertex;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+/**
+ * A vertex whose edges are backed by a byte-array.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public abstract class ByteArrayVertex<
+    I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends ByteArrayVertexBase<I, V, E, M> {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(ByteArrayVertex.class);
+
+  @Override
+  public final boolean addEdge(Edge<I, E> edge) {
+    // Note that this is very expensive (deserializes all edges
+    // in an addEdge() request).
+    // Hopefully the user set all the edges in setEdges().
+    for (Edge<I, E> currentEdge : getEdges()) {
+      if (currentEdge.getTargetVertexId().equals(edge.getTargetVertexId())) {
+        LOG.warn("addEdge: Vertex=" + getId() +
+            ": already added an edge value for target vertex id " +
+            edge.getTargetVertexId());
+        return false;
+      }
+    }
+    appendEdge(edge);
+    return true;
+  }
+
+  @Override
+  public final int removeEdges(I targetVertexId) {
+    return removeFirstEdge(targetVertexId) ? 1 : 0;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/vertex/ByteArrayVertexBase.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/vertex/ByteArrayVertexBase.java 
b/giraph-core/src/main/java/org/apache/giraph/vertex/ByteArrayVertexBase.java
new file mode 100644
index 0000000..26c3f62
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/vertex/ByteArrayVertexBase.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.vertex;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.utils.ByteArrayEdges;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Common base class for byte-array backed vertices.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public abstract class ByteArrayVertexBase<
+    I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends MutableVertex<I, V, E, M> {
+  /** Serialized edge list. */
+  private ByteArrayEdges<I, E> edges;
+
+  /**
+   * Append an edge to the serialized representation.
+   *
+   * @param edge Edge to append
+   */
+  protected void appendEdge(Edge<I, E> edge) {
+    edges.appendEdge(edge);
+  }
+
+  /**
+   * Remove the first edge pointing to a target vertex.
+   *
+   * @param targetVertexId Target vertex id
+   * @return True if one such edge was found and removed.
+   */
+  protected boolean removeFirstEdge(I targetVertexId) {
+    return edges.removeFirstEdge(targetVertexId);
+  }
+
+  /**
+   * Remove all edges pointing to a target vertex.
+   *
+   * @param targetVertexId Target vertex id
+   * @return The number of removed edges
+   */
+  protected int removeAllEdges(I targetVertexId) {
+    return edges.removeAllEdges(targetVertexId);
+  }
+
+  @Override
+  public final void setEdges(Iterable<Edge<I, E>> edges) {
+    // If the edge iterable is backed by a byte-array,
+    // we simply get a shallow copy of it.
+    if (edges instanceof ByteArrayEdges) {
+      this.edges = new ByteArrayEdges<I, E>((ByteArrayEdges<I, E>) edges);
+    } else {
+      this.edges = new ByteArrayEdges<I, E>(getConf());
+      this.edges.setEdges(edges);
+    }
+  }
+
+  @Override
+  public final Iterable<Edge<I, E>> getEdges() {
+    return edges;
+  }
+
+  @Override
+  public final int getNumEdges() {
+    return edges.getNumEdges();
+  }
+
+  @Override
+  public final void readFields(DataInput in) throws IOException {
+    I vertexId = getId();
+    if (vertexId == null) {
+      vertexId = getConf().createVertexId();
+    }
+    vertexId.readFields(in);
+
+    V vertexValue = getValue();
+    if (vertexValue == null) {
+      vertexValue = getConf().createVertexValue();
+    }
+    vertexValue.readFields(in);
+
+    initialize(vertexId, vertexValue);
+
+    edges.readFields(in);
+
+    readHaltBoolean(in);
+  }
+
+  @Override
+  public final void write(DataOutput out) throws IOException {
+    getId().write(out);
+    getValue().write(out);
+
+    edges.write(out);
+
+    out.writeBoolean(isHalted());
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/vertex/EdgeListVertex.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/vertex/EdgeListVertex.java 
b/giraph-core/src/main/java/org/apache/giraph/vertex/EdgeListVertex.java
index 9ae692f..882bbb8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/vertex/EdgeListVertex.java
+++ b/giraph-core/src/main/java/org/apache/giraph/vertex/EdgeListVertex.java
@@ -29,7 +29,7 @@ import java.util.Iterator;
  * User applications can subclass {@link EdgeListVertex}, which stores
  * the outbound edges in an ArrayList (less memory as the cost of expensive
  * random-access lookup).  Good for static graphs.  Not nearly as memory
- * efficient as using RepresentativeVertex + ByteArrayPartition
+ * efficient as using ByteArrayVertex + ByteArrayPartition
  * (probably about 10x more), but not bad when keeping vertices as objects in
  * memory (SimplePartition).
  *

http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/vertex/MultiGraphByteArrayVertex.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/vertex/MultiGraphByteArrayVertex.java
 
b/giraph-core/src/main/java/org/apache/giraph/vertex/MultiGraphByteArrayVertex.java
new file mode 100644
index 0000000..a50f48d
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/vertex/MultiGraphByteArrayVertex.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.vertex;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Similar to {@link ByteArrayVertex}, but allows for parallel edges.
+ *
+ * Note:  removeEdge() here removes all edges pointing to the target vertex,
+ * but returns only one of them (or null if there are no such edges).
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public abstract class MultiGraphByteArrayVertex<I extends
+    WritableComparable, V extends Writable, E extends Writable,
+    M extends Writable> extends ByteArrayVertexBase<I, V, E, M> {
+  @Override
+  public final boolean addEdge(Edge<I, E> edge) {
+    appendEdge(edge);
+    return true;
+  }
+
+  @Override
+  public final int removeEdges(I targetVertexId) {
+    return removeAllEdges(targetVertexId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java 
b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index fa3ab49..3256a02 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -565,10 +565,8 @@ else[HADOOP_NON_SECURE]*/
     }
 
     if (getConfiguration().hasEdgeInputFormat()) {
-      // Create vertices from added edges via vertex resolver.
-      // Doing this at the beginning of superstep 0 is not enough,
-      // because we want the vertex/edge stats to be accurate.
-      workerServer.resolveMutations(graphState);
+      // Move edges from temporary storage to their source vertices.
+      getServerData().getEdgeStore().moveEdgesToVertices();
     }
 
     // Generate the partition stats for the input superstep and process

http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
 
b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
index bdf9f57..c577cbb 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
@@ -130,7 +130,7 @@ public class EdgeInputSplitsCallable<I extends 
WritableComparable,
                 "without a value!  - " + readerEdge);
       }
 
-      graphState.getWorkerClientRequestProcessor().addEdgeRequest(
+      graphState.getWorkerClientRequestProcessor().sendEdgeRequest(
           sourceId, readerEdge);
       context.progress(); // do this before potential data transfer
       ++inputSplitEdgesLoaded;

http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java 
b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
index 2845c90..c42754f 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
@@ -103,10 +103,8 @@ public class RequestFailureTest {
     }
 
     // Send the request
-    SendWorkerMessagesRequest<IntWritable, IntWritable, IntWritable,
-            IntWritable> request =
-        new SendWorkerMessagesRequest<IntWritable, IntWritable,
-                    IntWritable, IntWritable>(dataToSend);
+    SendWorkerMessagesRequest<IntWritable, IntWritable> request =
+        new SendWorkerMessagesRequest<IntWritable, IntWritable>(dataToSend);
     return request;
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java 
b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
index d779fe4..ef1920d 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
@@ -171,10 +171,8 @@ public class RequestTest {
     }
 
     // Send the request
-    SendWorkerMessagesRequest<IntWritable, IntWritable, IntWritable,
-        IntWritable> request =
-      new SendWorkerMessagesRequest<IntWritable, IntWritable,
-            IntWritable, IntWritable>(dataToSend);
+    SendWorkerMessagesRequest<IntWritable, IntWritable> request =
+      new SendWorkerMessagesRequest<IntWritable, IntWritable>(dataToSend);
     client.sendWritableRequest(workerInfo.getTaskId(), request);
     client.waitAllRequests();
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java 
b/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
index 82dc283..04c7a3c 100644
--- a/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
+++ b/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
@@ -161,6 +161,7 @@ public class MockUtils {
   createNewServerData(ImmutableClassesGiraphConfiguration conf,
       Mapper.Context context) {
     return new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>(
+        Mockito.mock(CentralizedServiceWorker.class),
         conf,
         ByteArrayMessagesPerVertexStore.newFactory(
             MockUtils.mockServiceGetVertexPartitionOwner(1), conf),

http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/test/java/org/apache/giraph/vertex/TestMultiGraphVertex.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/test/java/org/apache/giraph/vertex/TestMultiGraphVertex.java 
b/giraph-core/src/test/java/org/apache/giraph/vertex/TestMultiGraphVertex.java
index a5a3545..11fc8a5 100644
--- 
a/giraph-core/src/test/java/org/apache/giraph/vertex/TestMultiGraphVertex.java
+++ 
b/giraph-core/src/test/java/org/apache/giraph/vertex/TestMultiGraphVertex.java
@@ -45,9 +45,9 @@ public class TestMultiGraphVertex {
     public void compute(Iterable<IntWritable> messages) throws IOException { }
   }
 
-  public static class MyMultiGraphRepresentativeVertex
-      extends MultiGraphRepresentativeVertex<IntWritable, IntWritable,
-            IntWritable, IntWritable> {
+  public static class MyMultiGraphByteArrayVertex
+      extends MultiGraphByteArrayVertex<IntWritable, IntWritable,
+                  IntWritable, IntWritable> {
     @Override
     public void compute(Iterable<IntWritable> messages) throws IOException { }
   }
@@ -55,7 +55,7 @@ public class TestMultiGraphVertex {
   @Before
   public void setUp() {
     vertexClasses.add(MyMultiGraphEdgeListVertex.class);
-    vertexClasses.add(MyMultiGraphRepresentativeVertex.class);
+    vertexClasses.add(MyMultiGraphByteArrayVertex.class);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/giraph/blob/21232615/giraph-core/src/test/java/org/apache/giraph/vertex/TestMutableVertex.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/test/java/org/apache/giraph/vertex/TestMutableVertex.java 
b/giraph-core/src/test/java/org/apache/giraph/vertex/TestMutableVertex.java
index ca4ba1a..cbbaa1b 100644
--- a/giraph-core/src/test/java/org/apache/giraph/vertex/TestMutableVertex.java
+++ b/giraph-core/src/test/java/org/apache/giraph/vertex/TestMutableVertex.java
@@ -80,10 +80,10 @@ public class TestMutableVertex {
 
   /**
    * Simple instantiable class that extends
-   * {@link org.apache.giraph.vertex.RepresentativeVertex}.
+   * {@link ByteArrayVertex}.
    */
-  public static class IFDLRepresentativeVertex extends 
RepresentativeVertex<IntWritable, FloatWritable, DoubleWritable,
-                  LongWritable> {
+  public static class IFDLRepresentativeVertex extends 
ByteArrayVertex<IntWritable, FloatWritable, DoubleWritable,
+                    LongWritable> {
     @Override
     public void compute(Iterable<LongWritable> messages) throws IOException { }
   }

Reply via email to