Added: 
giraph/trunk/src/main/java/org/apache/giraph/comm/messages/package-info.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/package-info.java?rev=1369508&view=auto
==============================================================================
--- 
giraph/trunk/src/main/java/org/apache/giraph/comm/messages/package-info.java 
(added)
+++ 
giraph/trunk/src/main/java/org/apache/giraph/comm/messages/package-info.java 
Sun Aug  5 00:17:12 2012
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of communication related objects, RPC service.
+ */
+package org.apache.giraph.comm.messages;

Modified: 
giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java?rev=1369508&r1=1369507&r2=1369508&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java 
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java 
Sun Aug  5 00:17:12 2012
@@ -43,14 +43,14 @@ public interface BasicVertexResolver<I e
    *        initialize())
    * @param vertex Original vertex or null if none
    * @param vertexChanges Changes that happened to this vertex or null if none
-   * @param messages messages received in the last superstep or null if none
+   * @param hasMessages True iff vertex received messages in the last superstep
    * @return Vertex to be returned, if null, and a vertex currently exists
    *         it will be removed
    */
   Vertex<I, V, E, M> resolve(I vertexId,
       Vertex<I, V, E, M> vertex,
       VertexChanges<I, V, E, M> vertexChanges,
-      Iterable<M> messages);
+      boolean hasMessages);
 
   /**
    * Create a default vertex that can be used to return from resolve().

Modified: 
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1369508&r1=1369507&r2=1369508&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java 
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java 
Sun Aug  5 00:17:12 2012
@@ -22,6 +22,7 @@ import org.apache.giraph.bsp.Application
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.NettyWorkerClientServer;
 import org.apache.giraph.comm.RPCCommunications;
+import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.comm.WorkerClientServer;
 import org.apache.giraph.comm.WorkerServer;
 import org.apache.giraph.graph.partition.Partition;
@@ -1172,6 +1173,8 @@ public class BspServiceWorker<I extends 
       LOG.warn("storeCheckpoint: Removed file " + verticesFilePath);
     }
 
+    boolean useNetty = getConfiguration().getBoolean(GiraphJob.USE_NETTY,
+        GiraphJob.USE_NETTY_DEFAULT);
     FSDataOutputStream verticesOutputStream =
         getFs().create(verticesFilePath);
     ByteArrayOutputStream metadataByteStream = new ByteArrayOutputStream();
@@ -1179,6 +1182,12 @@ public class BspServiceWorker<I extends 
     for (Partition<I, V, E, M> partition : workerPartitionMap.values()) {
       long startPos = verticesOutputStream.getPos();
       partition.write(verticesOutputStream);
+      // write messages
+      verticesOutputStream.writeBoolean(useNetty);
+      if (useNetty) {
+        getServerData().getCurrentMessageStore().writePartition(
+            verticesOutputStream, partition.getId());
+      }
       // Write the metadata for this partition
       // Format:
       // <index count>
@@ -1211,6 +1220,18 @@ public class BspServiceWorker<I extends 
 
   @Override
   public void loadCheckpoint(long superstep) {
+    if (getConfiguration().getBoolean(GiraphJob.USE_NETTY,
+        GiraphJob.USE_NETTY_DEFAULT)) {
+      try {
+        // clear old message stores
+        getServerData().getIncomingMessageStore().clearAll();
+        getServerData().getCurrentMessageStore().clearAll();
+      } catch (IOException e) {
+        throw new RuntimeException(
+            "loadCheckpoint: Failed to clear message stores ", e);
+      }
+    }
+
     // Algorithm:
     // Examine all the partition owners and load the ones
     // that match my hostname and id from the master designated checkpoint
@@ -1256,6 +1277,10 @@ public class BspServiceWorker<I extends 
                 " on " + partitionsFile);
           }
           partition.readFields(partitionsStream);
+          if (partitionsStream.readBoolean()) {
+            getServerData().getCurrentMessageStore().readFieldsForPartition(
+                partitionsStream, partitionId);
+          }
           partitionsStream.close();
           if (LOG.isInfoEnabled()) {
             LOG.info("loadCheckpoint: Loaded partition " +
@@ -1552,4 +1577,9 @@ public class BspServiceWorker<I extends 
       return null;
     }
   }
+
+  @Override
+  public ServerData<I, V, E, M> getServerData() {
+    return commService.getServerData();
+  }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1369508&r1=1369507&r2=1369508&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Sun Aug  
5 00:17:12 2012
@@ -336,6 +336,33 @@ public class GiraphJob {
   public static final String CHECKPOINT_DIRECTORY_DEFAULT =
       "_bsp/_checkpoints/";
 
+  /** Directory in the local file system for out-of-core messages. */
+  public static final String MESSAGES_DIRECTORY = "giraph.messagesDirectory";
+  /**
+   * Default messages directory. Final directory path will also have the
+   * job number for uniqueness
+   */
+  public static final String MESSAGES_DIRECTORY_DEFAULT = "_bsp/_messages/";
+
+  /** Whether or not to use out-of-core messages */
+  public static final String USE_OUT_OF_CORE_MESSAGES =
+      "giraph.useOutOfCoreMessages";
+  /** Default choice about using out-of-core messaging */
+  public static final boolean USE_OUT_OF_CORE_MESSAGES_DEFAULT = false;
+  /**
+   * If using out-of-core messaging, it tells how much messages do we keep
+   * in memory.
+   */
+  public static final String MAX_MESSAGES_IN_MEMORY =
+      "giraph.maxMessagesInMemory";
+  /** Default maximum number of messages in memory. */
+  public static final int MAX_MESSAGES_IN_MEMORY_DEFAULT = 1000000;
+  /** Size of buffer when reading and writing messages out-of-core. */
+  public static final String MESSAGES_BUFFER_SIZE =
+      "giraph.messagesBufferSize";
+  /** Default size of buffer when reading and writing messages out-of-core. */
+  public static final int MESSAGES_BUFFER_SIZE_DEFAULT = 8192;
+
   /** Keep the zookeeper output for debugging? Default is to remove it. */
   public static final String KEEP_ZOOKEEPER_DATA =
       "giraph.keepZooKeeperData";

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1369508&r1=1369507&r2=1369508&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java 
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java Sun Aug 
 5 00:17:12 2012
@@ -20,6 +20,7 @@ package org.apache.giraph.graph;
 
 import org.apache.giraph.bsp.CentralizedServiceMaster;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.messages.MessageStoreByPartition;
 import org.apache.giraph.graph.partition.Partition;
 import org.apache.giraph.graph.partition.PartitionOwner;
 import org.apache.giraph.graph.partition.PartitionStats;
@@ -580,6 +581,13 @@ public class GraphMapper<I extends Writa
       serviceWorker.getWorkerContext().preSuperstep();
       context.progress();
 
+      boolean useNetty = conf.getBoolean(GiraphJob.USE_NETTY,
+          GiraphJob.USE_NETTY_DEFAULT);
+      MessageStoreByPartition<I, M> messageStore = null;
+      if (useNetty) {
+        messageStore = serviceWorker.getServerData().getCurrentMessageStore();
+      }
+
       partitionStatsList.clear();
       for (Partition<I, V, E, M> partition :
         serviceWorker.getPartitionMap().values()) {
@@ -590,13 +598,27 @@ public class GraphMapper<I extends Writa
           // Make sure every vertex has the current
           // graphState before computing
           vertex.setGraphState(graphState);
-          if (vertex.isHalted() &
-              !Iterables.isEmpty(vertex.getMessages())) {
+
+          Collection<M> messages = null;
+          if (useNetty) {
+            messages = messageStore.getVertexMessages(vertex.getId());
+            messageStore.clearVertexMessages(vertex.getId());
+          }
+
+          boolean hasMessages = (messages != null && !messages.isEmpty()) ||
+              !Iterables.isEmpty(vertex.getMessages());
+          if (vertex.isHalted() && hasMessages) {
             vertex.wakeUp();
           }
           if (!vertex.isHalted()) {
+            Iterable<M> vertexMsgIt;
+            if (messages == null) {
+              vertexMsgIt = vertex.getMessages();
+            } else {
+              vertexMsgIt = messages;
+            }
             context.progress();
-            vertex.compute(vertex.getMessages());
+            vertex.compute(vertexMsgIt);
             vertex.releaseResources();
           }
           if (vertex.isHalted()) {
@@ -605,6 +627,11 @@ public class GraphMapper<I extends Writa
           partitionStats.incrVertexCount();
           partitionStats.addEdgeCount(vertex.getNumEdges());
         }
+
+        if (useNetty) {
+          messageStore.clearPartition(partition.getId());
+        }
+
         partitionStatsList.add(partitionStats);
       }
     } while (!serviceWorker.finishSuperstep(partitionStatsList));

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java?rev=1369508&r1=1369507&r2=1369508&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java 
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java Sun 
Aug  5 00:17:12 2012
@@ -24,8 +24,6 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
 
-import com.google.common.collect.Iterables;
-
 import java.util.List;
 
 /**
@@ -53,7 +51,7 @@ public class VertexResolver<I extends Wr
       I vertexId,
       Vertex<I, V, E, M> vertex,
       VertexChanges<I, V, E, M> vertexChanges,
-      Iterable<M> messages) {
+      boolean hasMessages) {
     // Default algorithm:
     // 1. If the vertex exists, first prune the edges
     // 2. If vertex removal desired, remove the vertex.
@@ -85,12 +83,12 @@ public class VertexResolver<I extends Wr
           vertex = vertexChanges.getAddedVertexList().get(0);
         }
       }
-      if (vertex == null && messages != null && !Iterables.isEmpty(messages)) {
+      if (vertex == null && hasMessages) {
         vertex = instantiateVertex();
         vertex.initialize(vertexId,
             BspUtils.<V>createVertexValue(getConf()),
             null,
-            messages);
+            null);
       }
     } else {
       if ((vertexChanges != null) &&

Modified: 
giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java?rev=1369508&r1=1369507&r2=1369508&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java 
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java 
Sun Aug  5 00:17:12 2012
@@ -19,6 +19,7 @@
 package org.apache.giraph.graph.partition;
 
 import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
@@ -50,7 +51,7 @@ public class Partition<I extends Writabl
   /** Partition id */
   private final int id;
   /** Vertex map for this range (keyed by index) */
-  private final Map<I, Vertex<I, V, E, M>> vertexMap = Maps.newHashMap();
+  private final Map<I, Vertex<I, V, E, M>> vertexMap;
 
   /**
    * Constructor.
@@ -61,6 +62,12 @@ public class Partition<I extends Writabl
   public Partition(Configuration conf, int id) {
     this.conf = conf;
     this.id = id;
+    if (conf.getBoolean(GiraphJob.USE_OUT_OF_CORE_MESSAGES,
+        GiraphJob.USE_OUT_OF_CORE_MESSAGES_DEFAULT)) {
+      vertexMap = Maps.newTreeMap();
+    } else {
+      vertexMap = Maps.newHashMap();
+    }
   }
 
   /**

Added: giraph/trunk/src/main/java/org/apache/giraph/utils/CollectionUtils.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/utils/CollectionUtils.java?rev=1369508&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/utils/CollectionUtils.java 
(added)
+++ giraph/trunk/src/main/java/org/apache/giraph/utils/CollectionUtils.java Sun 
Aug  5 00:17:12 2012
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import java.util.Collection;
+import java.util.concurrent.ConcurrentMap;
+
+/** Helper methods for Collections */
+public class CollectionUtils {
+  /** Do not instantiate. */
+  private CollectionUtils() { }
+
+  /**
+   * If map already has value associated with the key it adds values to that
+   * value, otherwise it will put values to the map.
+   *
+   * @param key    Key under which we are adding values
+   * @param values Values we want to add
+   * @param map    Map which we are adding values to
+   * @param <K> Key
+   * @param <V> Value
+   * @param <C> Collection
+   * @return New value associated with the key
+   */
+  public static <K, V, C extends Collection<V>> C addConcurrent(K key,
+      C values, ConcurrentMap<K, C> map) {
+    C currentValues = map.get(key);
+    if (currentValues == null) {
+      currentValues = map.putIfAbsent(key, values);
+      if (currentValues == null) {
+        return values;
+      }
+    }
+    synchronized (currentValues) {
+      currentValues.addAll(values);
+    }
+    return currentValues;
+  }
+}

Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java?rev=1369508&r1=1369507&r2=1369508&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java 
(original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java Sun 
Aug  5 00:17:12 2012
@@ -18,6 +18,8 @@
 
 package org.apache.giraph.comm;
 
+import org.apache.giraph.comm.messages.SimpleMessageStore;
+import org.apache.giraph.utils.MockUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapreduce.Mapper.Context;
@@ -47,7 +49,9 @@ public class ConnectionTest {
 
     Configuration conf = new Configuration();
     ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
-        new ServerData<IntWritable, IntWritable, IntWritable, 
IntWritable>(conf);
+        new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
+            (SimpleMessageStore.newFactory(
+                MockUtils.mockServiceGetVertexPartitionOwner(1), conf));
     NettyServer<IntWritable, IntWritable, IntWritable, IntWritable> server =
         new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
             conf, serverData);
@@ -74,7 +78,9 @@ public class ConnectionTest {
 
     Configuration conf = new Configuration();
     ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
-        new ServerData<IntWritable, IntWritable, IntWritable, 
IntWritable>(conf);
+        new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
+            (SimpleMessageStore.newFactory(
+                MockUtils.mockServiceGetVertexPartitionOwner(1), conf));
 
     NettyServer<IntWritable, IntWritable, IntWritable, IntWritable> server1 =
         new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
@@ -114,8 +120,9 @@ public class ConnectionTest {
 
     Configuration conf = new Configuration();
     ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
-        new ServerData<IntWritable, IntWritable, IntWritable,
-            IntWritable>(conf);
+        new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
+            (SimpleMessageStore.newFactory(
+                MockUtils.mockServiceGetVertexPartitionOwner(1), conf));
     NettyServer<IntWritable, IntWritable, IntWritable, IntWritable> server =
         new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
             conf, serverData);

Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java?rev=1369508&r1=1369507&r2=1369508&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java 
(original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java Sun Aug  
5 00:17:12 2012
@@ -18,11 +18,13 @@
 
 package org.apache.giraph.comm;
 
+import org.apache.giraph.comm.messages.SimpleMessageStore;
 import org.apache.giraph.graph.Edge;
 import org.apache.giraph.graph.EdgeListVertex;
 import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexMutations;
+import org.apache.giraph.utils.MockUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
@@ -91,15 +93,16 @@ public class RequestTest {
 
     // Start the service
     serverData =
-        new ServerData<IntWritable, IntWritable, IntWritable,
-            IntWritable>(conf);
+        new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
+            (SimpleMessageStore.newFactory(
+                MockUtils.mockServiceGetVertexPartitionOwner(1), conf));
     server =
         new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>(
             conf, serverData);
     server.start();
     client =
-        new NettyClient<IntWritable, IntWritable, IntWritable,
-        IntWritable>(context);
+        new NettyClient<IntWritable, IntWritable, IntWritable, IntWritable>
+            (context);
     client.connectAllAdddresses(Collections.singleton(server.getMyAddress()));
   }
 
@@ -173,15 +176,16 @@ public class RequestTest {
     server.stop();
 
     // Check the output
-    ConcurrentHashMap<IntWritable, Collection<IntWritable>> inVertexIdMessages 
=
-        serverData.getTransientMessages();
+    Iterable<IntWritable> vertices =
+        serverData.getIncomingMessageStore().getDestinationVertices();
     int keySum = 0;
     int messageSum = 0;
-    for (Entry<IntWritable, Collection<IntWritable>> entry :
-        inVertexIdMessages.entrySet()) {
-      keySum += entry.getKey().get();
-      synchronized (entry.getValue()) {
-        for (IntWritable message : entry.getValue()) {
+    for (IntWritable vertexId : vertices) {
+      keySum += vertexId.get();
+      Collection<IntWritable> messages =
+          serverData.getIncomingMessageStore().getVertexMessages(vertexId);
+      synchronized (messages) {
+        for (IntWritable message : messages) {
           messageSum += message.get();
         }
       }

Added: giraph/trunk/src/test/java/org/apache/giraph/comm/TestMessageStores.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/TestMessageStores.java?rev=1369508&view=auto
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/TestMessageStores.java 
(added)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/TestMessageStores.java 
Sun Aug  5 00:17:12 2012
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.messages.BasicMessageStore;
+import org.apache.giraph.comm.messages.DiskBackedMessageStoreByPartition;
+import org.apache.giraph.comm.messages.DiskBackedMessageStore;
+import org.apache.giraph.comm.messages.FlushableMessageStore;
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.comm.messages.MessageStoreFactory;
+import org.apache.giraph.comm.messages.SequentialFileMessageStore;
+import org.apache.giraph.comm.messages.SimpleMessageStore;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.utils.MockUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/** Test for different types of message stores */
+public class TestMessageStores {
+  private static String directory;
+  private static Configuration config;
+  private static TestData testData;
+  private static
+  CentralizedServiceWorker<IntWritable, IntWritable, IntWritable, IntWritable>
+      service;
+
+  @Before
+  public void prepare() {
+    directory = "test/";
+
+    Configuration.addDefaultResource("giraph-site.xml");
+    config = new Configuration();
+    config.setClass(GiraphJob.VERTEX_ID_CLASS, IntWritable.class,
+        WritableComparable.class);
+    config.setClass(GiraphJob.MESSAGE_VALUE_CLASS, IntWritable.class,
+        Writable.class);
+
+    testData = new TestData();
+    testData.maxId = 1000000;
+    testData.maxMessage = 1000000;
+    testData.maxNumberOfMessages = 100;
+    testData.numVertices = 50;
+    testData.numTimes = 10;
+    testData.numOfPartitions = 5;
+    testData.maxMessagesInMemory = 20;
+
+    service =
+        MockUtils.mockServiceGetVertexPartitionOwner(testData.numOfPartitions);
+
+    new File(directory).mkdir();
+  }
+
+  @After
+  public void cleanUp() {
+    new File(directory).delete();
+  }
+
+  private static class TestData {
+    int numTimes;
+    int numVertices;
+    int maxNumberOfMessages;
+    int maxId;
+    int maxMessage;
+    int numOfPartitions;
+    int maxMessagesInMemory;
+  }
+
+  private SortedMap<IntWritable, Collection<IntWritable>> createRandomMessages(
+      TestData testData) {
+    SortedMap<IntWritable, Collection<IntWritable>> allMessages =
+        new TreeMap<IntWritable, Collection<IntWritable>>();
+    for (int v = 0; v < testData.numVertices; v++) {
+      int messageNum = (int) (Math.random() * testData.maxNumberOfMessages);
+      Collection<IntWritable> vertexMessages = Lists.newArrayList();
+      for (int m = 0; m < messageNum; m++) {
+        vertexMessages.add(
+            new IntWritable((int) (Math.random() * testData.maxMessage)));
+      }
+      IntWritable vertexId =
+          new IntWritable((int) (Math.random() * testData.maxId));
+      allMessages.put(vertexId, vertexMessages);
+    }
+    return allMessages;
+  }
+
+  private void putNTimes
+      (MessageStore<IntWritable, IntWritable> messageStore,
+          Map<IntWritable, Collection<IntWritable>> messages,
+          TestData testData) throws IOException {
+    for (int n = 0; n < testData.numTimes; n++) {
+      SortedMap<IntWritable, Collection<IntWritable>> batch =
+          createRandomMessages(testData);
+      messageStore.addMessages(batch);
+      for (Entry<IntWritable, Collection<IntWritable>> entry :
+          batch.entrySet()) {
+        if (messages.containsKey(entry.getKey())) {
+          messages.get(entry.getKey()).addAll(entry.getValue());
+        } else {
+          messages.put(entry.getKey(), entry.getValue());
+        }
+      }
+    }
+  }
+
+  private <I extends WritableComparable, M extends Writable> boolean
+  equalMessages(
+      MessageStore<I, M> messageStore,
+      Map<I, Collection<M>> expectedMessages) throws IOException {
+    TreeSet<I> vertexIds = Sets.newTreeSet();
+    Iterables.addAll(vertexIds, messageStore.getDestinationVertices());
+    for (I vertexId : vertexIds) {
+      Collection<M> expected = expectedMessages.get(vertexId);
+      if (expected == null) {
+        return false;
+      }
+      Collection<M> actual = messageStore.getVertexMessages(vertexId);
+      if (!CollectionUtils.isEqualCollection(expected, actual)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private <S extends MessageStore<IntWritable, IntWritable>> S doCheckpoint(
+      MessageStoreFactory<IntWritable, IntWritable, S> messageStoreFactory,
+      S messageStore) throws IOException {
+    File file = new File(directory + "messageStoreTest");
+    if (file.exists()) {
+      file.delete();
+    }
+    file.createNewFile();
+    DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
+        (new FileOutputStream(file))));
+    messageStore.write(out);
+    out.close();
+
+    messageStore = messageStoreFactory.newStore();
+
+    DataInputStream in = new DataInputStream(new BufferedInputStream(
+        (new FileInputStream(file))));
+    messageStore.readFields(in);
+    in.close();
+    file.delete();
+
+    return messageStore;
+  }
+
+  private <S extends MessageStore<IntWritable, IntWritable>> void
+  testMessageStore(
+      MessageStoreFactory<IntWritable, IntWritable, S> messageStoreFactory,
+      TestData testData) throws IOException {
+    SortedMap<IntWritable, Collection<IntWritable>> messages =
+        new TreeMap<IntWritable, Collection<IntWritable>>();
+    S messageStore = messageStoreFactory.newStore();
+    putNTimes(messageStore, messages, testData);
+    assertTrue(equalMessages(messageStore, messages));
+    messageStore.clearAll();
+    messageStore = doCheckpoint(messageStoreFactory, messageStore);
+    assertTrue(equalMessages(messageStore, messages));
+    messageStore.clearAll();
+  }
+
+  @Test
+  public void testSimpleMessageStore() {
+    try {
+      testMessageStore(SimpleMessageStore.newFactory(service, config),
+          testData);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void testDiskBackedMessageStore() {
+    try {
+      MessageStoreFactory<IntWritable, IntWritable,
+          BasicMessageStore<IntWritable, IntWritable>> fileStoreFactory =
+          SequentialFileMessageStore.newFactory(config);
+      MessageStoreFactory<IntWritable, IntWritable,
+          FlushableMessageStore<IntWritable, IntWritable>> diskStoreFactory =
+          DiskBackedMessageStore.newFactory(config, fileStoreFactory);
+      testMessageStore(diskStoreFactory, testData);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void testDiskBackedMessageStoreByPartition() {
+    try {
+      MessageStoreFactory<IntWritable, IntWritable,
+          BasicMessageStore<IntWritable, IntWritable>> fileStoreFactory =
+          SequentialFileMessageStore.newFactory(config);
+      MessageStoreFactory<IntWritable, IntWritable,
+          FlushableMessageStore<IntWritable, IntWritable>> diskStoreFactory =
+          DiskBackedMessageStore.newFactory(config, fileStoreFactory);
+      testMessageStore(DiskBackedMessageStoreByPartition.newFactory(service,
+          testData.maxMessagesInMemory, diskStoreFactory), testData);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+}

Modified: giraph/trunk/src/test/java/org/apache/giraph/utils/MockUtils.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/utils/MockUtils.java?rev=1369508&r1=1369507&r2=1369508&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/utils/MockUtils.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/utils/MockUtils.java Sun Aug  
5 00:17:12 2012
@@ -18,14 +18,20 @@
 
 package org.apache.giraph.utils;
 
+import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.WorkerClientServer;
-import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.graph.partition.BasicPartitionOwner;
+import org.apache.giraph.graph.partition.PartitionOwner;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 /** simplify mocking for unit testing vertices */
 public class MockUtils {
@@ -128,4 +134,22 @@ public class MockUtils {
 
         return env;
     }
+
+  public static CentralizedServiceWorker<IntWritable, IntWritable,
+      IntWritable, IntWritable> mockServiceGetVertexPartitionOwner(final int
+      numOfPartitions) {
+    CentralizedServiceWorker<IntWritable, IntWritable, IntWritable,
+        IntWritable> service = Mockito.mock(CentralizedServiceWorker.class);
+    Answer<PartitionOwner> answer = new Answer<PartitionOwner>() {
+      @Override
+      public PartitionOwner answer(InvocationOnMock invocation) throws
+          Throwable {
+        IntWritable vertexId = (IntWritable) invocation.getArguments()[0];
+        return new BasicPartitionOwner(vertexId.get() % numOfPartitions, null);
+      }
+    };
+    Mockito.when(service.getVertexPartitionOwner(
+        Mockito.any(IntWritable.class))).thenAnswer(answer);
+    return service;
+  }
 }


Reply via email to