Repository: giraph
Updated Branches:
  refs/heads/trunk e92f2942f -> 535a333b7


http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdData.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdData.java 
b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdData.java
new file mode 100644
index 0000000..b8f8c8e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdData.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+
+/**
+ * Stores vertex ids and data associated with a vertex
+ *
+ * @param <I> vertexId type parameter
+ * @param <T> vertexData type parameter
+ */
+public interface VertexIdData<I extends WritableComparable, T>
+  extends ImmutableClassesGiraphConfigurable, Writable {
+  /**
+   * Create a new data object.
+   *
+   * @return Newly-created data object.
+   */
+  T createData();
+
+  /**
+   * Write a data object to an {@link ExtendedDataOutput}.
+   *
+   * @param out  {@link ExtendedDataOutput}
+   * @param data Data object to write
+   * @throws IOException
+   */
+  void writeData(ExtendedDataOutput out, T data) throws IOException;
+
+  /**
+   * Read a data object's fields from an {@link ExtendedDataInput}.
+   *
+   * @param in   {@link ExtendedDataInput}
+   * @param data Data object to fill in-place
+   * @throws IOException
+   */
+  void readData(ExtendedDataInput in, T data) throws IOException;
+
+  /**
+   * Initialize the inner state. Must be called before {@code add()} is
+   * called.
+   */
+  void initialize();
+
+  /**
+   * Initialize the inner state, with a known size. Must be called before
+   * {@code add()} is called.
+   *
+   * @param expectedSize Number of bytes to be expected
+   */
+  void initialize(int expectedSize);
+
+  /**
+   * Add a vertex id and data pair to the collection.
+   *
+   * @param vertexId Vertex id
+   * @param data Data
+   */
+  void add(I vertexId, T data);
+
+  /**
+   * Add a serialized vertex id and data.
+   *
+   * @param serializedId The bye array which holds the serialized id.
+   * @param idPos The end position of the serialized id in the byte array.
+   * @param data Data
+   */
+  void add(byte[] serializedId, int idPos, T data);
+
+  /**
+   * Get the number of bytes used.
+   *
+   * @return Bytes used
+   */
+  int getSize();
+
+  /**
+   * Get the size of this object in serialized form.
+   *
+   * @return The size (in bytes) of the serialized object
+   */
+  int getSerializedSize();
+
+  /**
+   * Check if the list is empty.
+   *
+   * @return Whether the list is empty
+   */
+  boolean isEmpty();
+
+  /**
+   * Clear the list.
+   */
+  void clear();
+
+  /**
+   * Get an iterator over the pairs.
+   *
+   * @return Iterator
+   */
+  VertexIdDataIterator<I, T> getVertexIdDataIterator();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdDataIterator.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdDataIterator.java 
b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdDataIterator.java
new file mode 100644
index 0000000..6aea8ea
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdDataIterator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Special iterator that reuses vertex ids and data objects so that the
+ * lifetime of the object is only until next() is called.
+ *
+ * Vertex id ownership can be released if desired through
+ * releaseCurrentVertexId().  This optimization allows us to cut down
+ * on the number of objects instantiated and garbage collected.
+ *
+ * @param <I> vertexId type parameter
+ * @param <T> vertexData type parameter
+ */
+public interface VertexIdDataIterator<I extends WritableComparable, T>
+    extends VertexIdIterator<I> {
+  /**
+   * Get the current data.
+   *
+   * @return Current data
+   */
+  T getCurrentData();
+
+  /**
+   * Release the current data object.
+   *
+   * @return Released data object
+   */
+  T releaseCurrentData();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdEdgeIterator.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdEdgeIterator.java 
b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdEdgeIterator.java
new file mode 100644
index 0000000..b9c88ec
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdEdgeIterator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Special iterator that reuses vertex ids and edge objects so that the
+ * lifetime of the object is only until next() is called.
+ *
+ * @param <I> vertexId type parameter
+ * @param <E> edge type parameter
+ */
+public interface VertexIdEdgeIterator<I extends WritableComparable,
+    E extends Writable> extends VertexIdDataIterator<I, Edge<I, E>> {
+  /**
+   * Get the current edge.
+   *
+   * @return Current edge
+   */
+  Edge<I, E> getCurrentEdge();
+
+  /**
+   * Release the current edge.
+   *
+   * @return Released edge
+   */
+  Edge<I, E> releaseCurrentEdge();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdEdges.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdEdges.java 
b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdEdges.java
new file mode 100644
index 0000000..8f3c03a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdEdges.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Stores vertex id and out-edges of a vertex
+ *
+ * @param <I> vertexId type parameter
+ * @param <E> edge type parameter
+ */
+public interface VertexIdEdges<I extends WritableComparable,
+    E extends Writable> extends VertexIdData<I, Edge<I, E>> {
+  /**
+   * Get an iterator over the pairs.
+   *
+   * @return Iterator
+   */
+  VertexIdEdgeIterator<I, E> getVertexIdEdgeIterator();
+}
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdIterator.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdIterator.java 
b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdIterator.java
index bad11d6..baaa543 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdIterator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdIterator.java
@@ -18,7 +18,6 @@
 
 package org.apache.giraph.utils;
 
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.hadoop.io.WritableComparable;
 
 /**
@@ -27,38 +26,18 @@ import org.apache.hadoop.io.WritableComparable;
  *
  * @param <I> Vertex id
  */
-public abstract class VertexIdIterator<I extends WritableComparable> {
-  /** Reader of the serialized edges */
-  protected final ExtendedDataInput extendedDataInput;
-
-  /** Current vertex id */
-  protected I vertexId;
-
-  /**
-   * Constructor.
-   *
-   * @param extendedDataOutput Extended data output
-   * @param configuration Configuration
-   */
-  public VertexIdIterator(
-      ExtendedDataOutput extendedDataOutput,
-      ImmutableClassesGiraphConfiguration<I, ?, ?> configuration) {
-    extendedDataInput = configuration.createExtendedDataInput(
-        extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
-  }
-
+public interface VertexIdIterator<I extends WritableComparable> {
   /**
    * Returns true if the iteration has more elements.
    *
    * @return True if the iteration has more elements.
    */
-  public boolean hasNext() {
-    return extendedDataInput.available() > 0;
-  }
+  boolean hasNext();
+
   /**
    * Moves to the next element in the iteration.
    */
-  public abstract void next();
+  void next();
 
   /**
    * Get the current vertex id.  Ihis object's contents are only guaranteed
@@ -67,9 +46,8 @@ public abstract class VertexIdIterator<I extends 
WritableComparable> {
    *
    * @return Current vertex id
    */
-  public I getCurrentVertexId() {
-    return vertexId;
-  }
+  I getCurrentVertexId();
+
   /**
    * The backing store of the current vertex id is now released.
    * Further calls to getCurrentVertexId () without calling next()
@@ -77,9 +55,6 @@ public abstract class VertexIdIterator<I extends 
WritableComparable> {
    *
    * @return Current vertex id that was released
    */
-  public I releaseCurrentVertexId() {
-    I releasedVertexId = vertexId;
-    vertexId = null;
-    return releasedVertexId;
-  }
+  I releaseCurrentVertexId();
 }
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessageBytesIterator.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessageBytesIterator.java
 
b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessageBytesIterator.java
new file mode 100644
index 0000000..194bf69
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessageBytesIterator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataOutput;
+
+/**
+ * Special iterator that reuses vertex ids and messages bytes so that the
+ * lifetime of the object is only until next() is called.
+ *
+ * Vertex id ownership can be released if desired through
+ * releaseCurrentVertexId().  This optimization allows us to cut down
+ * on the number of objects instantiated and garbage collected.  Messages
+ * can only be copied to an ExtendedDataOutput object
+ *
+ * @param <I> vertexId type parameter
+ * @param <M> message type parameter
+ */
+public interface VertexIdMessageBytesIterator<I extends WritableComparable,
+    M extends Writable> extends VertexIdDataIterator<I, M> {
+
+  /**
+   * Write the current message to an ExtendedDataOutput object
+   *
+   * @param dataOutput Where the current message will be written to
+   */
+  void writeCurrentMessageBytes(DataOutput dataOutput);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessageIterator.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessageIterator.java
 
b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessageIterator.java
new file mode 100644
index 0000000..c241cea
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessageIterator.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Special iterator that reuses vertex ids and message objects so that the
+ * lifetime of the object is only until next() is called.
+ *
+ * @param <I> vertexId type parameter
+ * @param <M> message type parameter
+ */
+public interface VertexIdMessageIterator<I extends WritableComparable,
+    M extends Writable> extends VertexIdDataIterator<I, M> {
+  /**
+   * Get the current message.
+   *
+   * @return Current message
+   */
+  M getCurrentMessage();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessages.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessages.java 
b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessages.java
new file mode 100644
index 0000000..99e5d71
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessages.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * VertexIdMessages
+ *
+ * @param <I> vertexId type parameter
+ * @param <M> message type parameter
+ */
+public interface VertexIdMessages<I extends WritableComparable,
+  M extends Writable> extends VertexIdData<I, M> {
+  /**
+   * Get specialized iterator that will instantiate the vertex id and
+   * message of this object.  It will only produce message bytes, not actual
+   * messages and expects a different encoding.
+   *
+   * @return Special iterator that reuses vertex ids (unless released) and
+   *         copies message bytes
+   */
+  VertexIdMessageBytesIterator<I, M> getVertexIdMessageBytesIterator();
+
+  /**
+   * Get specialized iterator that will instiantiate the vertex id and
+   * message of this object.
+   *
+   * @return Special iterator that reuses vertex ids and messages unless
+   *         specified
+   */
+  VertexIdMessageIterator<I, M> getVertexIdMessageIterator();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java 
b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
index 236bc88..157a543 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
@@ -27,6 +27,7 @@ import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.factories.TestMessageValueFactory;
+import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.IntNoOpComputation;
 import org.apache.giraph.utils.MockUtils;
@@ -48,6 +49,7 @@ import static org.mockito.Mockito.when;
 /**
  * Test all the netty failure scenarios
  */
+@SuppressWarnings("unchecked")
 public class RequestFailureTest {
   /** Configuration */
   private ImmutableClassesGiraphConfiguration conf;
@@ -75,10 +77,10 @@ public class RequestFailureTest {
   private WritableRequest getRequest() {
     // Data to send
     final int partitionId = 0;
-    PairList<Integer, ByteArrayVertexIdMessages<IntWritable,
+    PairList<Integer, VertexIdMessages<IntWritable,
                 IntWritable>>
         dataToSend = new PairList<Integer,
-        ByteArrayVertexIdMessages<IntWritable, IntWritable>>();
+        VertexIdMessages<IntWritable, IntWritable>>();
     dataToSend.initialize();
     ByteArrayVertexIdMessages<IntWritable,
             IntWritable> vertexIdMessages =
@@ -97,6 +99,7 @@ public class RequestFailureTest {
     // Send the request
     SendWorkerMessagesRequest<IntWritable, IntWritable> request =
         new SendWorkerMessagesRequest<IntWritable, IntWritable>(dataToSend);
+    request.setConf(conf);
     return request;
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java 
b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
index fcdfa5c..32454f4 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
@@ -36,6 +36,7 @@ import org.apache.giraph.graph.VertexMutations;
 import org.apache.giraph.metrics.GiraphMetrics;
 import org.apache.giraph.partition.Partition;
 import org.apache.giraph.partition.PartitionStore;
+import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.giraph.utils.ByteArrayOneToAllMessages;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.ExtendedDataOutput;
@@ -64,6 +65,7 @@ import static org.mockito.Mockito.when;
 /**
  * Test all the different netty requests.
  */
+@SuppressWarnings("unchecked")
 public class RequestTest {
   /** Configuration */
   private ImmutableClassesGiraphConfiguration conf;
@@ -141,16 +143,15 @@ public class RequestTest {
   @Test
   public void sendWorkerMessagesRequest() throws IOException {
     // Data to send
-    PairList<Integer, ByteArrayVertexIdMessages<IntWritable,
+    PairList<Integer, VertexIdMessages<IntWritable,
             IntWritable>>
-        dataToSend = new PairList<Integer,
-        ByteArrayVertexIdMessages<IntWritable, IntWritable>>();
+        dataToSend = new PairList<>();
     dataToSend.initialize();
     int partitionId = 0;
     ByteArrayVertexIdMessages<IntWritable,
             IntWritable> vertexIdMessages =
-        new ByteArrayVertexIdMessages<IntWritable, IntWritable>(
-            new TestMessageValueFactory<IntWritable>(IntWritable.class));
+        new ByteArrayVertexIdMessages<>(
+            new TestMessageValueFactory<>(IntWritable.class));
     vertexIdMessages.setConf(conf);
     vertexIdMessages.initialize();
     dataToSend.add(partitionId, vertexIdMessages);
@@ -163,7 +164,9 @@ public class RequestTest {
 
     // Send the request
     SendWorkerMessagesRequest<IntWritable, IntWritable> request =
-      new SendWorkerMessagesRequest<IntWritable, IntWritable>(dataToSend);
+      new SendWorkerMessagesRequest<>(dataToSend);
+    request.setConf(conf);
+
     client.sendWritableRequest(workerInfo.getTaskId(), request);
     client.waitAllRequests();
 
@@ -195,8 +198,8 @@ public class RequestTest {
   public void sendWorkerOneToAllMessagesRequest() throws IOException {
     // Data to send
     ByteArrayOneToAllMessages<IntWritable, IntWritable>
-        dataToSend = new ByteArrayOneToAllMessages<
-        IntWritable, IntWritable>(new 
TestMessageValueFactory<IntWritable>(IntWritable.class));
+        dataToSend = new ByteArrayOneToAllMessages<>(new
+        TestMessageValueFactory<>(IntWritable.class));
     dataToSend.setConf(conf);
     dataToSend.initialize();
     ExtendedDataOutput output = conf.createExtendedDataOutput();
@@ -208,7 +211,7 @@ public class RequestTest {
 
     // Send the request
     SendWorkerOneToAllMessagesRequest<IntWritable, IntWritable> request =
-      new SendWorkerOneToAllMessagesRequest<IntWritable, 
IntWritable>(dataToSend, conf);
+      new SendWorkerOneToAllMessagesRequest<>(dataToSend, conf);
     client.sendWritableRequest(workerInfo.getTaskId(), request);
     client.waitAllRequests();
 
@@ -304,4 +307,4 @@ public class RequestTest {
     }
     assertEquals(55, keySum);
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/giraph/blob/535a333b/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java 
b/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
index 97e88f8..5d8d478 100644
--- a/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
+++ b/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
@@ -30,10 +30,7 @@ import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.partition.BasicPartitionOwner;
 import org.apache.giraph.partition.PartitionOwner;
-import org.apache.giraph.partition.PartitionStore;
 import org.apache.giraph.partition.SimplePartition;
-import org.apache.giraph.partition.SimplePartitionStore;
-import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
@@ -196,7 +193,7 @@ public class MockUtils {
     ServerData<IntWritable, IntWritable, IntWritable> serverData =
       new ServerData<IntWritable, IntWritable, IntWritable>(
       serviceWorker, conf, ByteArrayMessagesPerVertexStore.newFactory(
-        serviceWorker, conf), context);
+          serviceWorker, conf), context);
     // Here we add a partition to simulate the case that there is one 
partition.
     serverData.getPartitionStore().addPartition(new SimplePartition());
     return serverData;

Reply via email to