Added: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayIterable.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayIterable.java?rev=1414361&view=auto
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayIterable.java
 (added)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayIterable.java
 Tue Nov 27 20:01:38 2012
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.util.Iterator;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This iterable is designed to deserialize a byte array on the fly to
+ * provide new copies of writable objects when desired.  It does not reuse
+ * objects, and instead creates a new one for every next().
+ *
+ * @param <T> Type that extends Writable that will be iterated
+ */
+public abstract class ByteArrayIterable<T extends Writable> implements
+    Iterable<T> {
+  /** Configuration */
+  protected final ImmutableClassesGiraphConfiguration configuration;
+  /** Data input */
+  protected final byte[] buf;
+  /** Offset to start in buf */
+  protected final int off;
+  /** Length of buf */
+  protected final int length;
+
+  /**
+   * Constructor
+   *
+   * @param configuration Configuration
+   * @param buf Buffer
+   * @param off Offset to start in the buffer
+   * @param length Length of the buffer
+   */
+  public ByteArrayIterable(ImmutableClassesGiraphConfiguration configuration,
+                           byte[] buf, int off, int length) {
+    this.configuration = configuration;
+    this.buf = buf;
+    this.off = off;
+    this.length = length;
+  }
+
+  /**
+   * Must be able to create the writable object
+   *
+   * @return New writable
+   */
+  protected abstract T createWritable();
+
+  /**
+   * Iterator over the internal byte array
+   */
+  private class ByteArrayIterableIterator extends ByteArrayIterator<T> {
+    /**
+     * Constructor.
+     *
+     * @param buf Buffer to read from
+     * @param off Offset to read from in the buffer
+     * @param length Maximum length of the buffer
+     */
+    private ByteArrayIterableIterator(byte[] buf, int off, int length) {
+      super(configuration, buf, off, length);
+    }
+
+    @Override
+    protected T createWritable() {
+      return ByteArrayIterable.this.createWritable();
+    }
+  }
+
+  @Override
+  public Iterator<T> iterator() {
+    return new ByteArrayIterableIterator(buf, off, length);
+  }
+}

Added: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayIterator.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayIterator.java?rev=1414361&view=auto
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayIterator.java
 (added)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayIterator.java
 Tue Nov 27 20:01:38 2012
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This iterator is designed to deserialize a byte array on the fly to
+ * provide new copies of writable objects when desired.  It does not reuse
+ * objects, and instead creates a new one for every next().
+ *
+ * @param <T> Type that extends Writable that will be iterated
+ */
+public abstract class ByteArrayIterator<T extends Writable> implements
+    Iterator<T> {
+  /** Data input */
+  protected final ExtendedDataInput extendedDataInput;
+
+  /**
+   * Constructor
+   *
+   * @param configuration Configuration
+   * @param buf Buffer
+   * @param off Offset to start in the buffer
+   * @param length Length of the buffer
+   */
+  public ByteArrayIterator(
+      ImmutableClassesGiraphConfiguration configuration,
+      byte[] buf, int off, int length) {
+    extendedDataInput =
+        configuration.createExtendedDataInput(buf, off, length);
+  }
+
+  @Override
+  public boolean hasNext() {
+    return extendedDataInput.available() > 0;
+  }
+
+  @Override
+  public T next() {
+    T writable = createWritable();
+    try {
+      writable.readFields(extendedDataInput);
+    } catch (IOException e) {
+      throw new IllegalStateException("next: readFields got IOException", e);
+    }
+    return writable;
+  }
+
+  @Override
+  public void remove() {
+    throw new IllegalAccessError("remove: Not supported");
+  }
+
+  /**
+   * Must be able to create the writable object
+   *
+   * @return New writable
+   */
+  protected abstract T createWritable();
+}

Added: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java?rev=1414361&view=auto
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
 (added)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
 Tue Nov 27 20:01:38 2012
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.giraph.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Stores vertex id and message pairs in a single byte array
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ */
+public class ByteArrayVertexIdMessages<I extends WritableComparable,
+    M extends Writable> implements Writable,
+    ImmutableClassesGiraphConfigurable {
+  /** Extended data output */
+  private ExtendedDataOutput extendedDataOutput;
+  /** Configuration */
+  private ImmutableClassesGiraphConfiguration<I, ?, ?, M> configuration;
+  /** Add the message size to the stream? (Depends on the message store) */
+  private boolean useMessageSizeEncoding = false;
+
+  /**
+   * Constructor for reflection
+   */
+  public ByteArrayVertexIdMessages() { }
+
+  /**
+   * Set whether message sizes should be encoded.  This should only be a
+   * possibility when not combining.  When combining, all messages need to be
+   * deserializd right away, so this won't help.
+   */
+  private void setUseMessageSizeEncoding() {
+    if (!configuration.useCombiner()) {
+      useMessageSizeEncoding = configuration.useMessageSizeEncoding();
+    } else {
+      useMessageSizeEncoding = false;
+    }
+  }
+
+  /**
+   * Initialize the inner state. Must be called before {@code add()} is
+   * called.
+   */
+  public void initialize() {
+    extendedDataOutput = configuration.createExtendedDataOutput();
+    setUseMessageSizeEncoding();
+  }
+
+  /**
+   * Initialize the inner state, with a known size. Must be called before
+   * {@code add()} is called.
+   *
+   * @param expectedSize Number of bytes to be expected
+   */
+  public void initialize(int expectedSize) {
+    extendedDataOutput = configuration.createExtendedDataOutput(expectedSize);
+    setUseMessageSizeEncoding();
+  }
+
+  /**
+   * Add a vertex id and message pair to the collection.
+   *
+   * @param vertexId Vertex id
+   * @param message Message
+   */
+  public void add(I vertexId, M message) {
+    try {
+      vertexId.write(extendedDataOutput);
+      // Write the size if configured this way, else, just the message
+      if (useMessageSizeEncoding) {
+        int pos = extendedDataOutput.getPos();
+        extendedDataOutput.skipBytes(4);
+        message.write(extendedDataOutput);
+        extendedDataOutput.writeInt(
+            pos, extendedDataOutput.getPos() - pos - 4);
+      } else {
+        message.write(extendedDataOutput);
+      }
+    } catch (IOException e) {
+      throw new IllegalStateException("add: IOException", e);
+    }
+  }
+
+  /**
+   * Get the number of bytes used
+   *
+   * @return Number of bytes used
+   */
+  public int getSize() {
+    return extendedDataOutput.getPos();
+  }
+
+  /**
+   * Check if the list is empty.
+   *
+   * @return True iff there are no pairs in the list
+   */
+  public boolean isEmpty() {
+    return extendedDataOutput.getPos() == 0;
+  }
+
+  /**
+   * Clear the collection.
+   */
+  public void clear() {
+    extendedDataOutput.reset();
+  }
+
+  /**
+   * Get specialized iterator that will instiantiate the vertex id and
+   * message of this object.
+   *
+   * @return Special iterator that reuses vertex ids and messages unless
+   *         specified
+   */
+  public VertexIdMessageIterator getVertexIdMessageIterator() {
+    return new VertexIdMessageIterator();
+  }
+
+  /**
+   * Get specialized iterator that will instiantiate the vertex id and
+   * message of this object.  It will only produce message bytes, not actual
+   * messages and expects a different encoding.
+   *
+   * @return Special iterator that reuses vertex ids (unless released) and
+   *         copies message bytes
+   */
+  public VertexIdMessageBytesIterator getVertexIdMessageBytesIterator() {
+    if (!useMessageSizeEncoding) {
+      return null;
+    }
+    return new VertexIdMessageBytesIterator();
+  }
+
+  @Override
+  public void setConf(ImmutableClassesGiraphConfiguration configuration) {
+    this.configuration = configuration;
+  }
+
+  @Override
+  public ImmutableClassesGiraphConfiguration getConf() {
+    return configuration;
+  }
+
+  @Override
+  public void write(DataOutput dataOutput) throws IOException {
+    dataOutput.writeBoolean(useMessageSizeEncoding);
+    dataOutput.writeInt(extendedDataOutput.getPos());
+    dataOutput.write(extendedDataOutput.getByteArray(), 0,
+        extendedDataOutput.getPos());
+  }
+
+  @Override
+  public void readFields(DataInput dataInput) throws IOException {
+    useMessageSizeEncoding = dataInput.readBoolean();
+    int size = dataInput.readInt();
+    byte[] buf = new byte[size];
+    dataInput.readFully(buf);
+    extendedDataOutput = configuration.createExtendedDataOutput(buf, size);
+  }
+
+  /**
+   * Common implementation for VertexIdMessageIterator
+   * and VertexIdMessageBytesIterator
+   */
+  public abstract class VertexIdIterator {
+    /** Reader of the serialized messages */
+    protected final ExtendedDataInput extendedDataInput =
+        configuration.createExtendedDataInput(
+            extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
+    /** Current vertex id */
+    protected I vertexId;
+
+    /**
+     * Returns true if the iteration has more elements.
+     *
+     * @return True if the iteration has more elements.
+     */
+    public boolean hasNext() {
+      return extendedDataInput.available() > 0;
+    }
+    /**
+     * Moves to the next element in the iteration.
+     */
+    public abstract void next();
+
+    /**
+     * Get the current vertex id.  Ihis object's contents are only guaranteed
+     * until next() is called.  To take ownership of this object call
+     * releaseCurrentVertexId() after getting a reference to this object.
+     *
+     * @return Current vertex id
+     */
+    public I getCurrentVertexId() {
+      return vertexId;
+    }
+    /**
+     * The backing store of the current vertex id is now released.
+     * Further calls to getCurrentVertexId () without calling next()
+     * will return null.
+     *
+     * @return Current vertex id that was released
+     */
+    public I releaseCurrentVertexId() {
+      I releasedVertexId = vertexId;
+      vertexId = null;
+      return releasedVertexId;
+    }
+  }
+
+  /**
+   * Special iterator that reuses vertex ids and messages so that the
+   * lifetime of the object is only until next() is called.
+   *
+   * Vertex id ownership can be released if desired through
+   * releaseCurrentVertexId().  This optimization allows us to cut down
+   * on the number of objects instantiated and garbage collected.
+   *
+   * Not thread-safe.
+   */
+  public class VertexIdMessageIterator extends VertexIdIterator {
+    /** Current message */
+    private M message;
+
+    @Override
+    public void next() {
+      if (vertexId == null) {
+        vertexId = configuration.createVertexId();
+      }
+      if (message == null) {
+        message = configuration.createMessageValue();
+      }
+      try {
+        vertexId.readFields(extendedDataInput);
+        message.readFields(extendedDataInput);
+      } catch (IOException e) {
+        throw new IllegalStateException("next: IOException", e);
+      }
+    }
+
+    /**
+     * Get the current message
+     *
+     * @return Current message
+     */
+    public M getCurrentMessage() {
+      return message;
+    }
+  }
+
+  /**
+   * Special iterator that reuses vertex ids and messages bytes so that the
+   * lifetime of the object is only until next() is called.
+   *
+   * Vertex id ownership can be released if desired through
+   * releaseCurrentVertexId().  This optimization allows us to cut down
+   * on the number of objects instantiated and garbage collected.  Messages
+   * can only be copied to an ExtendedDataOutput object
+   *
+   * Not thread-safe.
+   */
+  public class VertexIdMessageBytesIterator extends VertexIdIterator {
+    /** Last message offset */
+    private int messageOffset = -1;
+    /** Number of bytes in the last message */
+    private int messageBytes = -1;
+
+    /**
+     * Moves to the next element in the iteration.
+     */
+    @Override
+    public void next() {
+      if (vertexId == null) {
+        vertexId = configuration.createVertexId();
+      }
+
+      try {
+        vertexId.readFields(extendedDataInput);
+        messageBytes = extendedDataInput.readInt();
+        messageOffset = extendedDataInput.getPos();
+        if (extendedDataInput.skipBytes(messageBytes) != messageBytes) {
+          throw new IllegalStateException("next: Failed to skip " +
+              messageBytes);
+        }
+      } catch (IOException e) {
+        throw new IllegalStateException("next: IOException", e);
+      }
+    }
+
+    /**
+     * Write the current message to an ExtendedDataOutput object
+     *
+     * @param dataOutput Where the current message will be written to
+     */
+    public void writeCurrentMessageBytes(
+        ExtendedDataOutput dataOutput) {
+      try {
+        dataOutput.write(
+            extendedDataOutput.getByteArray(), messageOffset, messageBytes);
+      } catch (IOException e) {
+        throw new IllegalStateException("writeCurrentMessageBytes: Got " +
+            "IOException", e);
+      }
+    }
+  }
+}

Modified: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/CollectionUtils.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/CollectionUtils.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/CollectionUtils.java 
(original)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/CollectionUtils.java 
Tue Nov 27 20:01:38 2012
@@ -18,17 +18,23 @@
 
 package org.apache.giraph.utils;
 
+import com.google.common.collect.Iterables;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.concurrent.ConcurrentMap;
+import org.apache.log4j.Logger;
 
 /** Helper methods for Collections */
 public class CollectionUtils {
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(CollectionUtils.class);
+
   /** Do not instantiate. */
   private CollectionUtils() { }
 
   /**
-   * If map already has value associated with the key it adds values to that
-   * value, otherwise it will put values to the map.
+   * If map already has a value associated with the key it adds values to that
+   * value, otherwise it will put values to the map.  Do not reuse values.
    *
    * @param key    Key under which we are adding values
    * @param values Values we want to add
@@ -52,4 +58,61 @@ public class CollectionUtils {
     }
     return currentValues;
   }
+
+  /**
+   * Helper method to check if iterables are equal.  Supports the case
+   * where the iterable next() returns a reused object.  We do assume that
+   * iterator() produces the objects in the same order across repeated calls,
+   * if the object doesn't change.   This is very expensive (n^2) and should
+   * be used for testing only.
+   *
+   * @param first First iterable
+   * @param second Second iterable
+   * @param <T> Type to compare
+   * @return True if equal, false otherwise
+   */
+  public static <T> boolean isEqual(Iterable<T> first, Iterable<T> second) {
+    // Relies on elements from the iterator arriving in the same order.
+    // For every element in first, check elements on the second iterable by
+    // marking the ones seen that have been found.  Then ensure that all
+    // the elements of the second have been seen as well.
+    int firstSize = Iterables.size(first);
+    int secondSize = Iterables.size(second);
+    boolean[] usedSecondArray = new boolean[secondSize];
+    Iterator<T> firstIterator = first.iterator();
+    while (firstIterator.hasNext()) {
+      T firstValue = firstIterator.next();
+      boolean foundFirstValue = false;
+      Iterator<T> secondIterator = second.iterator();
+      for (int i = 0; i < usedSecondArray.length; ++i) {
+        T secondValue = secondIterator.next();
+        if (!usedSecondArray[i]) {
+          if (firstValue.equals(secondValue)) {
+            usedSecondArray[i] = true;
+            foundFirstValue = true;
+            break;
+          }
+        }
+      }
+
+      if (!foundFirstValue) {
+        LOG.error("isEqual: Couldn't find element from first (" + firstValue +
+            ") in second " + second + "(size=" + secondSize + ")");
+        return false;
+      }
+    }
+
+    Iterator<T> secondIterator = second.iterator();
+    for (int i = 0; i < usedSecondArray.length; ++i) {
+      T secondValue = secondIterator.next();
+      if (!usedSecondArray[i]) {
+        LOG.error("isEqual: Element " + secondValue + " (index " + i +
+            ") in second " + second + "(size=" + secondSize +
+            ") not found in " + first + " (size=" + firstSize + ")");
+        return false;
+      }
+    }
+
+    return true;
+  }
 }

Modified: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/EmptyIterable.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/EmptyIterable.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/EmptyIterable.java 
(original)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/EmptyIterable.java 
Tue Nov 27 20:01:38 2012
@@ -27,6 +27,20 @@ import java.util.NoSuchElementException;
  * @param <M> Message data
  */
 public class EmptyIterable<M> implements Iterable<M>, Iterator<M> {
+  /** Singleton empty iterable */
+  private static final EmptyIterable<Object> EMPTY_ITERABLE =
+      new EmptyIterable<Object>();
+
+  /**
+   * Get the singleton empty iterable
+   *
+   * @param <T> Type of the empty iterable
+   * @return Empty singleton iterable
+   */
+  public static <T> Iterable<T> emptyIterable() {
+    return (Iterable<T>) EMPTY_ITERABLE;
+  }
+
   @Override
   public Iterator<M> iterator() {
     return this;

Modified: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java
 (original)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java
 Tue Nov 27 20:01:38 2012
@@ -21,6 +21,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.Arrays;
 
 /**
  * Adds some functionality to ByteArrayOutputStream,
@@ -140,6 +141,14 @@ public class ExtendedByteArrayDataOutput
   }
 
   @Override
+  public void skipBytes(int bytesToSkip) {
+    if ((count + bytesToSkip) > buf.length) {
+      buf = Arrays.copyOf(buf, Math.max(buf.length << 1, count + bytesToSkip));
+    }
+    count += bytesToSkip;
+  }
+
+  @Override
   public void writeInt(int position, int value) {
     if (position + 4 > count) {
       throw new IndexOutOfBoundsException(

Modified: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java
 (original)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java
 Tue Nov 27 20:01:38 2012
@@ -24,6 +24,13 @@ import java.io.DataOutput;
  */
 public interface ExtendedDataOutput extends DataOutput {
   /**
+   * Skip some number of bytes.
+   *
+   * @param  bytesToSkip Number of bytes to skip
+   */
+  void skipBytes(int bytesToSkip);
+
+  /**
    * In order to write a size as a first part of an data output, it is
    * useful to be able to write an int at an arbitrary location in the stream
    *

Added: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterable.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterable.java?rev=1414361&view=auto
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterable.java
 (added)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterable.java
 Tue Nov 27 20:01:38 2012
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.util.Iterator;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * The objects provided by the iterators generated from this object have
+ * lifetimes only until next() is called.  In that sense, the object
+ * provided is only a representative object.
+ *
+ * @param <T> Type that extends Writable that will be iterated
+ */
+public abstract class RepresentativeByteArrayIterable<T extends Writable>
+    extends ByteArrayIterable<T> {
+  /**
+   * Constructor
+   *
+   * @param configuration Configuration
+   * @param buf Buffer
+   * @param off Offset to start in the buffer
+   * @param length Length of the buffer
+   */
+  public RepresentativeByteArrayIterable(
+      ImmutableClassesGiraphConfiguration configuration,
+      byte[] buf, int off, int length) {
+    super(configuration, buf, off, length);
+  }
+
+  /**
+   * Iterator over the internal byte array
+   */
+  private class RepresentativeByteArrayIterableIterator extends
+      RepresentativeByteArrayIterator<T> {
+    /**
+     * Constructor.
+     *
+     * @param buf Buffer to read from
+     * @param off Offset to read from in the buffer
+     * @param length Maximum length of the buffer
+     */
+    private RepresentativeByteArrayIterableIterator(
+        byte[] buf, int off, int length) {
+      super(configuration, buf, off, length);
+    }
+
+    @Override
+    protected T createWritable() {
+      return RepresentativeByteArrayIterable.this.createWritable();
+    }
+  }
+
+  @Override
+  public Iterator<T> iterator() {
+    return new RepresentativeByteArrayIterableIterator(buf, off, length);
+  }
+}

Added: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterator.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterator.java?rev=1414361&view=auto
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterator.java
 (added)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/RepresentativeByteArrayIterator.java
 Tue Nov 27 20:01:38 2012
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.IOException;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * The objects provided by this iterator have lifetimes only until next() is
+ * called.  In that sense, the object provided is only a representative object.
+ *
+ * @param <T> Type that extends Writable that will be iterated
+ */
+public abstract class RepresentativeByteArrayIterator<T extends
+    Writable> extends ByteArrayIterator<T> {
+  /** Representative writable */
+  private final T representativeWritable = createWritable();
+
+  /**
+   * Constructor
+   *
+   * @param configuration Configuration
+   * @param buf buffer to read from
+   * @param off Offset into the buffer to start from
+   * @param length Length of the buffer
+   */
+  public RepresentativeByteArrayIterator(
+      ImmutableClassesGiraphConfiguration configuration,
+      byte[] buf, int off, int length) {
+    super(configuration, buf, off, length);
+  }
+
+  @Override
+  public T next() {
+    try {
+      representativeWritable.readFields(extendedDataInput);
+    } catch (IOException e) {
+      throw new IllegalStateException("next: readFields got IOException", e);
+    }
+    return representativeWritable;
+  }
+}

Modified: 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java
 (original)
+++ 
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/UnsafeByteArrayOutputStream.java
 Tue Nov 27 20:01:38 2012
@@ -212,6 +212,14 @@ public class UnsafeByteArrayOutputStream
   }
 
   @Override
+  public void skipBytes(int bytesToSkip) {
+    if ((pos + bytesToSkip) > buf.length) {
+      buf = Arrays.copyOf(buf, Math.max(buf.length << 1, pos + bytesToSkip));
+    }
+    pos += bytesToSkip;
+  }
+
+  @Override
   public void writeInt(int pos, int value) {
     if (pos + SIZE_OF_INT > this.pos) {
       throw new IndexOutOfBoundsException(

Modified: 
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
 (original)
+++ 
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
 Tue Nov 27 20:01:38 2012
@@ -20,7 +20,6 @@ package org.apache.giraph.comm;
 
 import com.google.common.collect.Lists;
 import java.io.IOException;
-import java.util.Collection;
 import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.comm.netty.NettyClient;
@@ -30,7 +29,7 @@ import org.apache.giraph.comm.requests.S
 import org.apache.giraph.comm.requests.WritableRequest;
 import org.apache.giraph.graph.EdgeListVertex;
 import org.apache.giraph.graph.WorkerInfo;
-import org.apache.giraph.utils.ByteArrayVertexIdMessageCollection;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.MockUtils;
 import org.apache.giraph.utils.PairList;
 import org.apache.hadoop.io.IntWritable;
@@ -82,14 +81,14 @@ public class RequestFailureTest {
   private WritableRequest getRequest() {
     // Data to send
     final int partitionId = 0;
-    PairList<Integer, ByteArrayVertexIdMessageCollection<IntWritable,
-            IntWritable>>
+    PairList<Integer, ByteArrayVertexIdMessages<IntWritable,
+                IntWritable>>
         dataToSend = new PairList<Integer,
-        ByteArrayVertexIdMessageCollection<IntWritable, IntWritable>>();
+        ByteArrayVertexIdMessages<IntWritable, IntWritable>>();
     dataToSend.initialize();
-    ByteArrayVertexIdMessageCollection<IntWritable,
-        IntWritable> vertexIdMessages =
-        new ByteArrayVertexIdMessageCollection<IntWritable, IntWritable>();
+    ByteArrayVertexIdMessages<IntWritable,
+            IntWritable> vertexIdMessages =
+        new ByteArrayVertexIdMessages<IntWritable, IntWritable>();
     vertexIdMessages.setConf(conf);
     vertexIdMessages.initialize();
     dataToSend.add(partitionId, vertexIdMessages);
@@ -116,7 +115,7 @@ public class RequestFailureTest {
     int messageSum = 0;
     for (IntWritable vertexId : vertices) {
       keySum += vertexId.get();
-      Collection<IntWritable> messages =
+      Iterable<IntWritable> messages =
           serverData.getIncomingMessageStore().getVertexMessages(vertexId);
       synchronized (messages) {
         for (IntWritable message : messages) {

Modified: 
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java 
(original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java 
Tue Nov 27 20:01:38 2012
@@ -21,7 +21,6 @@ package org.apache.giraph.comm;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import java.io.IOException;
-import java.util.Collection;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
@@ -40,7 +39,7 @@ import org.apache.giraph.graph.VertexMut
 import org.apache.giraph.graph.WorkerInfo;
 import org.apache.giraph.graph.partition.Partition;
 import org.apache.giraph.graph.partition.PartitionStore;
-import org.apache.giraph.utils.ByteArrayVertexIdMessageCollection;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.MockUtils;
 import org.apache.giraph.utils.PairList;
 import org.apache.hadoop.io.IntWritable;
@@ -144,15 +143,15 @@ public class RequestTest {
   @Test
   public void sendWorkerMessagesRequest() throws IOException {
     // Data to send
-    PairList<Integer, ByteArrayVertexIdMessageCollection<IntWritable,
-        IntWritable>>
+    PairList<Integer, ByteArrayVertexIdMessages<IntWritable,
+            IntWritable>>
         dataToSend = new PairList<Integer,
-        ByteArrayVertexIdMessageCollection<IntWritable, IntWritable>>();
+        ByteArrayVertexIdMessages<IntWritable, IntWritable>>();
     dataToSend.initialize();
     int partitionId = 0;
-    ByteArrayVertexIdMessageCollection<IntWritable,
-        IntWritable> vertexIdMessages =
-        new ByteArrayVertexIdMessageCollection<IntWritable, IntWritable>();
+    ByteArrayVertexIdMessages<IntWritable,
+            IntWritable> vertexIdMessages =
+        new ByteArrayVertexIdMessages<IntWritable, IntWritable>();
     vertexIdMessages.setConf(conf);
     vertexIdMessages.initialize();
     dataToSend.add(partitionId, vertexIdMessages);
@@ -182,7 +181,7 @@ public class RequestTest {
     int messageSum = 0;
     for (IntWritable vertexId : vertices) {
       keySum += vertexId.get();
-      Collection<IntWritable> messages =
+      Iterable<IntWritable> messages =
           serverData.getIncomingMessageStore().getVertexMessages(vertexId);
       synchronized (messages) {
         for (IntWritable message : messages) {

Modified: 
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/TestMessageStores.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/TestMessageStores.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- 
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/TestMessageStores.java 
(original)
+++ 
giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/TestMessageStores.java 
Tue Nov 27 20:01:38 2012
@@ -18,19 +18,38 @@
 
 package org.apache.giraph.comm;
 
-import org.apache.commons.collections.CollectionUtils;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
 import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.messages.BasicMessageStore;
-import org.apache.giraph.comm.messages.CollectionOfMessagesPerVertexStore;
-import org.apache.giraph.comm.messages.DiskBackedMessageStoreByPartition;
+import org.apache.giraph.comm.messages.ByteArrayMessagesPerVertexStore;
 import org.apache.giraph.comm.messages.DiskBackedMessageStore;
+import org.apache.giraph.comm.messages.DiskBackedMessageStoreByPartition;
 import org.apache.giraph.comm.messages.FlushableMessageStore;
 import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.comm.messages.MessageStoreFactory;
 import org.apache.giraph.comm.messages.SequentialFileMessageStore;
 import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.CollectionUtils;
 import org.apache.giraph.utils.MockUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
@@ -42,25 +61,6 @@ import org.junit.Test;
 
 import static org.junit.Assert.assertTrue;
 
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
 /** Test for different types of message stores */
 public class TestMessageStores {
   private static String directory;
@@ -69,6 +69,11 @@ public class TestMessageStores {
   private static
   CentralizedServiceWorker<IntWritable, IntWritable, IntWritable, IntWritable>
       service;
+  /**
+   * Pseudo-random number generator with the same seed to help with
+   * debugging)
+   */
+  private static final Random RANDOM = new Random(101);
 
   private static class IntVertex extends EdgeListVertex<IntWritable,
       IntWritable, IntWritable, IntWritable> {
@@ -122,27 +127,68 @@ public class TestMessageStores {
     SortedMap<IntWritable, Collection<IntWritable>> allMessages =
         new TreeMap<IntWritable, Collection<IntWritable>>();
     for (int v = 0; v < testData.numVertices; v++) {
-      int messageNum = (int) (Math.random() * testData.maxNumberOfMessages);
+      int messageNum =
+          (int) (RANDOM.nextFloat() * testData.maxNumberOfMessages);
       Collection<IntWritable> vertexMessages = Lists.newArrayList();
       for (int m = 0; m < messageNum; m++) {
         vertexMessages.add(
-            new IntWritable((int) (Math.random() * testData.maxMessage)));
+            new IntWritable((int) (RANDOM.nextFloat() * testData.maxMessage)));
       }
       IntWritable vertexId =
-          new IntWritable((int) (Math.random() * testData.maxId));
+          new IntWritable((int) (RANDOM.nextFloat() * testData.maxId));
       allMessages.put(vertexId, vertexMessages);
     }
     return allMessages;
   }
 
-  private void putNTimes
-      (MessageStore<IntWritable, IntWritable> messageStore,
-          Map<IntWritable, Collection<IntWritable>> messages,
-          TestData testData) throws IOException {
+  /**
+   * Used for testing only
+   */
+  private static class InputMessageStore extends
+      ByteArrayMessagesPerVertexStore<IntWritable, IntWritable> {
+
+    /**
+     * Constructor
+     *
+     * @param service Service worker
+     * @param config  Hadoop configuration
+     */
+    InputMessageStore(
+        CentralizedServiceWorker<IntWritable, ?, ?, IntWritable> service,
+        ImmutableClassesGiraphConfiguration<IntWritable, ?, ?,
+            IntWritable> config,
+        Map<IntWritable, Collection<IntWritable>> inputMap) throws IOException 
{
+      super(service, config);
+      // Adds all the messages to the store
+      for (Map.Entry<IntWritable, Collection<IntWritable>> entry :
+          inputMap.entrySet()) {
+        int partitionId = getPartitionId(entry.getKey());
+        ByteArrayVertexIdMessages<IntWritable, IntWritable>
+            byteArrayVertexIdMessages =
+            new ByteArrayVertexIdMessages<IntWritable, IntWritable>();
+        byteArrayVertexIdMessages.setConf(config);
+        byteArrayVertexIdMessages.initialize();
+        for (IntWritable message : entry.getValue()) {
+          byteArrayVertexIdMessages.add(entry.getKey(), message);
+        }
+        try {
+          addPartitionMessages(partitionId, byteArrayVertexIdMessages);
+        } catch (IOException e) {
+          throw new IllegalStateException("Got IOException", e);
+        }
+      }
+    }
+  }
+
+  private void putNTimes(
+      MessageStore<IntWritable, IntWritable> messageStore,
+      Map<IntWritable, Collection<IntWritable>> messages,
+      TestData testData) throws IOException {
     for (int n = 0; n < testData.numTimes; n++) {
       SortedMap<IntWritable, Collection<IntWritable>> batch =
           createRandomMessages(testData);
-      messageStore.addMessages(batch);
+      messageStore.addMessages(new InputMessageStore(service, config,
+          batch));
       for (Entry<IntWritable, Collection<IntWritable>> entry :
           batch.entrySet()) {
         if (messages.containsKey(entry.getKey())) {
@@ -161,12 +207,14 @@ public class TestMessageStores {
     TreeSet<I> vertexIds = Sets.newTreeSet();
     Iterables.addAll(vertexIds, messageStore.getDestinationVertices());
     for (I vertexId : vertexIds) {
-      Collection<M> expected = expectedMessages.get(vertexId);
+      Iterable<M> expected = expectedMessages.get(vertexId);
       if (expected == null) {
         return false;
       }
-      Collection<M> actual = messageStore.getVertexMessages(vertexId);
-      if (!CollectionUtils.isEqualCollection(expected, actual)) {
+      Iterable<M> actual = messageStore.getVertexMessages(vertexId);
+      if (!CollectionUtils.isEqual(expected, actual)) {
+        System.err.println("equalMessages: For vertexId " + vertexId +
+            " expected " + expected + ", but got " + actual);
         return false;
       }
     }
@@ -213,10 +261,10 @@ public class TestMessageStores {
   }
 
   @Test
-  public void testCollectionOfMessagesPeVertexStore() {
+  public void testByteArrayMessagesPerVertexStore() {
     try {
       testMessageStore(
-          CollectionOfMessagesPerVertexStore.newFactory(service, config),
+          ByteArrayMessagesPerVertexStore.newFactory(service, config),
           testData);
     } catch (IOException e) {
       e.printStackTrace();

Modified: 
giraph/trunk/giraph/src/test/java/org/apache/giraph/utils/MockUtils.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/utils/MockUtils.java?rev=1414361&r1=1414360&r2=1414361&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/utils/MockUtils.java 
(original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/utils/MockUtils.java 
Tue Nov 27 20:01:38 2012
@@ -22,7 +22,7 @@ import org.apache.giraph.ImmutableClasse
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
-import org.apache.giraph.comm.messages.CollectionOfMessagesPerVertexStore;
+import org.apache.giraph.comm.messages.ByteArrayMessagesPerVertexStore;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.partition.BasicPartitionOwner;
@@ -162,7 +162,7 @@ public class MockUtils {
       Mapper.Context context) {
     return new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>(
         conf,
-        CollectionOfMessagesPerVertexStore.newFactory(
+        ByteArrayMessagesPerVertexStore.newFactory(
             MockUtils.mockServiceGetVertexPartitionOwner(1), conf),
         context);
   }


Reply via email to