Added: giraph/trunk/src/main/java/org/apache/giraph/comm/messages/package-info.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/package-info.java?rev=1369508&view=auto ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/comm/messages/package-info.java (added) +++ giraph/trunk/src/main/java/org/apache/giraph/comm/messages/package-info.java Sun Aug 5 00:17:12 2012 @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Package of communication related objects, RPC service. + */ +package org.apache.giraph.comm.messages;
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java?rev=1369508&r1=1369507&r2=1369508&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java Sun Aug 5 00:17:12 2012 @@ -43,14 +43,14 @@ public interface BasicVertexResolver<I e * initialize()) * @param vertex Original vertex or null if none * @param vertexChanges Changes that happened to this vertex or null if none - * @param messages messages received in the last superstep or null if none + * @param hasMessages True iff vertex received messages in the last superstep * @return Vertex to be returned, if null, and a vertex currently exists * it will be removed */ Vertex<I, V, E, M> resolve(I vertexId, Vertex<I, V, E, M> vertex, VertexChanges<I, V, E, M> vertexChanges, - Iterable<M> messages); + boolean hasMessages); /** * Create a default vertex that can be used to return from resolve(). Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1369508&r1=1369507&r2=1369508&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Sun Aug 5 00:17:12 2012 @@ -22,6 +22,7 @@ import org.apache.giraph.bsp.Application import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.comm.NettyWorkerClientServer; import org.apache.giraph.comm.RPCCommunications; +import org.apache.giraph.comm.ServerData; import org.apache.giraph.comm.WorkerClientServer; import org.apache.giraph.comm.WorkerServer; import org.apache.giraph.graph.partition.Partition; @@ -1172,6 +1173,8 @@ public class BspServiceWorker<I extends LOG.warn("storeCheckpoint: Removed file " + verticesFilePath); } + boolean useNetty = getConfiguration().getBoolean(GiraphJob.USE_NETTY, + GiraphJob.USE_NETTY_DEFAULT); FSDataOutputStream verticesOutputStream = getFs().create(verticesFilePath); ByteArrayOutputStream metadataByteStream = new ByteArrayOutputStream(); @@ -1179,6 +1182,12 @@ public class BspServiceWorker<I extends for (Partition<I, V, E, M> partition : workerPartitionMap.values()) { long startPos = verticesOutputStream.getPos(); partition.write(verticesOutputStream); + // write messages + verticesOutputStream.writeBoolean(useNetty); + if (useNetty) { + getServerData().getCurrentMessageStore().writePartition( + verticesOutputStream, partition.getId()); + } // Write the metadata for this partition // Format: // <index count> @@ -1211,6 +1220,18 @@ public class BspServiceWorker<I extends @Override public void loadCheckpoint(long superstep) { + if (getConfiguration().getBoolean(GiraphJob.USE_NETTY, + GiraphJob.USE_NETTY_DEFAULT)) { + try { + // clear old message stores + getServerData().getIncomingMessageStore().clearAll(); + getServerData().getCurrentMessageStore().clearAll(); + } catch (IOException e) { + throw new RuntimeException( + "loadCheckpoint: Failed to clear message stores ", e); + } + } + // Algorithm: // Examine all the partition owners and load the ones // that match my hostname and id from the master designated checkpoint @@ -1256,6 +1277,10 @@ public class BspServiceWorker<I extends " on " + partitionsFile); } partition.readFields(partitionsStream); + if (partitionsStream.readBoolean()) { + getServerData().getCurrentMessageStore().readFieldsForPartition( + partitionsStream, partitionId); + } partitionsStream.close(); if (LOG.isInfoEnabled()) { LOG.info("loadCheckpoint: Loaded partition " + @@ -1552,4 +1577,9 @@ public class BspServiceWorker<I extends return null; } } + + @Override + public ServerData<I, V, E, M> getServerData() { + return commService.getServerData(); + } } Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1369508&r1=1369507&r2=1369508&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Sun Aug 5 00:17:12 2012 @@ -336,6 +336,33 @@ public class GiraphJob { public static final String CHECKPOINT_DIRECTORY_DEFAULT = "_bsp/_checkpoints/"; + /** Directory in the local file system for out-of-core messages. */ + public static final String MESSAGES_DIRECTORY = "giraph.messagesDirectory"; + /** + * Default messages directory. Final directory path will also have the + * job number for uniqueness + */ + public static final String MESSAGES_DIRECTORY_DEFAULT = "_bsp/_messages/"; + + /** Whether or not to use out-of-core messages */ + public static final String USE_OUT_OF_CORE_MESSAGES = + "giraph.useOutOfCoreMessages"; + /** Default choice about using out-of-core messaging */ + public static final boolean USE_OUT_OF_CORE_MESSAGES_DEFAULT = false; + /** + * If using out-of-core messaging, it tells how much messages do we keep + * in memory. + */ + public static final String MAX_MESSAGES_IN_MEMORY = + "giraph.maxMessagesInMemory"; + /** Default maximum number of messages in memory. */ + public static final int MAX_MESSAGES_IN_MEMORY_DEFAULT = 1000000; + /** Size of buffer when reading and writing messages out-of-core. */ + public static final String MESSAGES_BUFFER_SIZE = + "giraph.messagesBufferSize"; + /** Default size of buffer when reading and writing messages out-of-core. */ + public static final int MESSAGES_BUFFER_SIZE_DEFAULT = 8192; + /** Keep the zookeeper output for debugging? Default is to remove it. */ public static final String KEEP_ZOOKEEPER_DATA = "giraph.keepZooKeeperData"; Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1369508&r1=1369507&r2=1369508&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java Sun Aug 5 00:17:12 2012 @@ -20,6 +20,7 @@ package org.apache.giraph.graph; import org.apache.giraph.bsp.CentralizedServiceMaster; import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.comm.messages.MessageStoreByPartition; import org.apache.giraph.graph.partition.Partition; import org.apache.giraph.graph.partition.PartitionOwner; import org.apache.giraph.graph.partition.PartitionStats; @@ -580,6 +581,13 @@ public class GraphMapper<I extends Writa serviceWorker.getWorkerContext().preSuperstep(); context.progress(); + boolean useNetty = conf.getBoolean(GiraphJob.USE_NETTY, + GiraphJob.USE_NETTY_DEFAULT); + MessageStoreByPartition<I, M> messageStore = null; + if (useNetty) { + messageStore = serviceWorker.getServerData().getCurrentMessageStore(); + } + partitionStatsList.clear(); for (Partition<I, V, E, M> partition : serviceWorker.getPartitionMap().values()) { @@ -590,13 +598,27 @@ public class GraphMapper<I extends Writa // Make sure every vertex has the current // graphState before computing vertex.setGraphState(graphState); - if (vertex.isHalted() & - !Iterables.isEmpty(vertex.getMessages())) { + + Collection<M> messages = null; + if (useNetty) { + messages = messageStore.getVertexMessages(vertex.getId()); + messageStore.clearVertexMessages(vertex.getId()); + } + + boolean hasMessages = (messages != null && !messages.isEmpty()) || + !Iterables.isEmpty(vertex.getMessages()); + if (vertex.isHalted() && hasMessages) { vertex.wakeUp(); } if (!vertex.isHalted()) { + Iterable<M> vertexMsgIt; + if (messages == null) { + vertexMsgIt = vertex.getMessages(); + } else { + vertexMsgIt = messages; + } context.progress(); - vertex.compute(vertex.getMessages()); + vertex.compute(vertexMsgIt); vertex.releaseResources(); } if (vertex.isHalted()) { @@ -605,6 +627,11 @@ public class GraphMapper<I extends Writa partitionStats.incrVertexCount(); partitionStats.addEdgeCount(vertex.getNumEdges()); } + + if (useNetty) { + messageStore.clearPartition(partition.getId()); + } + partitionStatsList.add(partitionStats); } } while (!serviceWorker.finishSuperstep(partitionStatsList)); Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java?rev=1369508&r1=1369507&r2=1369508&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java Sun Aug 5 00:17:12 2012 @@ -24,8 +24,6 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.log4j.Logger; -import com.google.common.collect.Iterables; - import java.util.List; /** @@ -53,7 +51,7 @@ public class VertexResolver<I extends Wr I vertexId, Vertex<I, V, E, M> vertex, VertexChanges<I, V, E, M> vertexChanges, - Iterable<M> messages) { + boolean hasMessages) { // Default algorithm: // 1. If the vertex exists, first prune the edges // 2. If vertex removal desired, remove the vertex. @@ -85,12 +83,12 @@ public class VertexResolver<I extends Wr vertex = vertexChanges.getAddedVertexList().get(0); } } - if (vertex == null && messages != null && !Iterables.isEmpty(messages)) { + if (vertex == null && hasMessages) { vertex = instantiateVertex(); vertex.initialize(vertexId, BspUtils.<V>createVertexValue(getConf()), null, - messages); + null); } } else { if ((vertexChanges != null) && Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java?rev=1369508&r1=1369507&r2=1369508&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java Sun Aug 5 00:17:12 2012 @@ -19,6 +19,7 @@ package org.apache.giraph.graph.partition; import org.apache.giraph.graph.BspUtils; +import org.apache.giraph.graph.GiraphJob; import org.apache.giraph.graph.Vertex; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; @@ -50,7 +51,7 @@ public class Partition<I extends Writabl /** Partition id */ private final int id; /** Vertex map for this range (keyed by index) */ - private final Map<I, Vertex<I, V, E, M>> vertexMap = Maps.newHashMap(); + private final Map<I, Vertex<I, V, E, M>> vertexMap; /** * Constructor. @@ -61,6 +62,12 @@ public class Partition<I extends Writabl public Partition(Configuration conf, int id) { this.conf = conf; this.id = id; + if (conf.getBoolean(GiraphJob.USE_OUT_OF_CORE_MESSAGES, + GiraphJob.USE_OUT_OF_CORE_MESSAGES_DEFAULT)) { + vertexMap = Maps.newTreeMap(); + } else { + vertexMap = Maps.newHashMap(); + } } /** Added: giraph/trunk/src/main/java/org/apache/giraph/utils/CollectionUtils.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/utils/CollectionUtils.java?rev=1369508&view=auto ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/utils/CollectionUtils.java (added) +++ giraph/trunk/src/main/java/org/apache/giraph/utils/CollectionUtils.java Sun Aug 5 00:17:12 2012 @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.utils; + +import java.util.Collection; +import java.util.concurrent.ConcurrentMap; + +/** Helper methods for Collections */ +public class CollectionUtils { + /** Do not instantiate. */ + private CollectionUtils() { } + + /** + * If map already has value associated with the key it adds values to that + * value, otherwise it will put values to the map. + * + * @param key Key under which we are adding values + * @param values Values we want to add + * @param map Map which we are adding values to + * @param <K> Key + * @param <V> Value + * @param <C> Collection + * @return New value associated with the key + */ + public static <K, V, C extends Collection<V>> C addConcurrent(K key, + C values, ConcurrentMap<K, C> map) { + C currentValues = map.get(key); + if (currentValues == null) { + currentValues = map.putIfAbsent(key, values); + if (currentValues == null) { + return values; + } + } + synchronized (currentValues) { + currentValues.addAll(values); + } + return currentValues; + } +} Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java?rev=1369508&r1=1369507&r2=1369508&view=diff ============================================================================== --- giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java (original) +++ giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java Sun Aug 5 00:17:12 2012 @@ -18,6 +18,8 @@ package org.apache.giraph.comm; +import org.apache.giraph.comm.messages.SimpleMessageStore; +import org.apache.giraph.utils.MockUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Mapper.Context; @@ -47,7 +49,9 @@ public class ConnectionTest { Configuration conf = new Configuration(); ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData = - new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>(conf); + new ServerData<IntWritable, IntWritable, IntWritable, IntWritable> + (SimpleMessageStore.newFactory( + MockUtils.mockServiceGetVertexPartitionOwner(1), conf)); NettyServer<IntWritable, IntWritable, IntWritable, IntWritable> server = new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>( conf, serverData); @@ -74,7 +78,9 @@ public class ConnectionTest { Configuration conf = new Configuration(); ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData = - new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>(conf); + new ServerData<IntWritable, IntWritable, IntWritable, IntWritable> + (SimpleMessageStore.newFactory( + MockUtils.mockServiceGetVertexPartitionOwner(1), conf)); NettyServer<IntWritable, IntWritable, IntWritable, IntWritable> server1 = new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>( @@ -114,8 +120,9 @@ public class ConnectionTest { Configuration conf = new Configuration(); ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData = - new ServerData<IntWritable, IntWritable, IntWritable, - IntWritable>(conf); + new ServerData<IntWritable, IntWritable, IntWritable, IntWritable> + (SimpleMessageStore.newFactory( + MockUtils.mockServiceGetVertexPartitionOwner(1), conf)); NettyServer<IntWritable, IntWritable, IntWritable, IntWritable> server = new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>( conf, serverData); Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java?rev=1369508&r1=1369507&r2=1369508&view=diff ============================================================================== --- giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java (original) +++ giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java Sun Aug 5 00:17:12 2012 @@ -18,11 +18,13 @@ package org.apache.giraph.comm; +import org.apache.giraph.comm.messages.SimpleMessageStore; import org.apache.giraph.graph.Edge; import org.apache.giraph.graph.EdgeListVertex; import org.apache.giraph.graph.GiraphJob; import org.apache.giraph.graph.Vertex; import org.apache.giraph.graph.VertexMutations; +import org.apache.giraph.utils.MockUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; @@ -91,15 +93,16 @@ public class RequestTest { // Start the service serverData = - new ServerData<IntWritable, IntWritable, IntWritable, - IntWritable>(conf); + new ServerData<IntWritable, IntWritable, IntWritable, IntWritable> + (SimpleMessageStore.newFactory( + MockUtils.mockServiceGetVertexPartitionOwner(1), conf)); server = new NettyServer<IntWritable, IntWritable, IntWritable, IntWritable>( conf, serverData); server.start(); client = - new NettyClient<IntWritable, IntWritable, IntWritable, - IntWritable>(context); + new NettyClient<IntWritable, IntWritable, IntWritable, IntWritable> + (context); client.connectAllAdddresses(Collections.singleton(server.getMyAddress())); } @@ -173,15 +176,16 @@ public class RequestTest { server.stop(); // Check the output - ConcurrentHashMap<IntWritable, Collection<IntWritable>> inVertexIdMessages = - serverData.getTransientMessages(); + Iterable<IntWritable> vertices = + serverData.getIncomingMessageStore().getDestinationVertices(); int keySum = 0; int messageSum = 0; - for (Entry<IntWritable, Collection<IntWritable>> entry : - inVertexIdMessages.entrySet()) { - keySum += entry.getKey().get(); - synchronized (entry.getValue()) { - for (IntWritable message : entry.getValue()) { + for (IntWritable vertexId : vertices) { + keySum += vertexId.get(); + Collection<IntWritable> messages = + serverData.getIncomingMessageStore().getVertexMessages(vertexId); + synchronized (messages) { + for (IntWritable message : messages) { messageSum += message.get(); } } Added: giraph/trunk/src/test/java/org/apache/giraph/comm/TestMessageStores.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/TestMessageStores.java?rev=1369508&view=auto ============================================================================== --- giraph/trunk/src/test/java/org/apache/giraph/comm/TestMessageStores.java (added) +++ giraph/trunk/src/test/java/org/apache/giraph/comm/TestMessageStores.java Sun Aug 5 00:17:12 2012 @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.comm; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.comm.messages.BasicMessageStore; +import org.apache.giraph.comm.messages.DiskBackedMessageStoreByPartition; +import org.apache.giraph.comm.messages.DiskBackedMessageStore; +import org.apache.giraph.comm.messages.FlushableMessageStore; +import org.apache.giraph.comm.messages.MessageStore; +import org.apache.giraph.comm.messages.MessageStoreFactory; +import org.apache.giraph.comm.messages.SequentialFileMessageStore; +import org.apache.giraph.comm.messages.SimpleMessageStore; +import org.apache.giraph.graph.GiraphJob; +import org.apache.giraph.utils.MockUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Map.Entry; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.TreeSet; + +/** Test for different types of message stores */ +public class TestMessageStores { + private static String directory; + private static Configuration config; + private static TestData testData; + private static + CentralizedServiceWorker<IntWritable, IntWritable, IntWritable, IntWritable> + service; + + @Before + public void prepare() { + directory = "test/"; + + Configuration.addDefaultResource("giraph-site.xml"); + config = new Configuration(); + config.setClass(GiraphJob.VERTEX_ID_CLASS, IntWritable.class, + WritableComparable.class); + config.setClass(GiraphJob.MESSAGE_VALUE_CLASS, IntWritable.class, + Writable.class); + + testData = new TestData(); + testData.maxId = 1000000; + testData.maxMessage = 1000000; + testData.maxNumberOfMessages = 100; + testData.numVertices = 50; + testData.numTimes = 10; + testData.numOfPartitions = 5; + testData.maxMessagesInMemory = 20; + + service = + MockUtils.mockServiceGetVertexPartitionOwner(testData.numOfPartitions); + + new File(directory).mkdir(); + } + + @After + public void cleanUp() { + new File(directory).delete(); + } + + private static class TestData { + int numTimes; + int numVertices; + int maxNumberOfMessages; + int maxId; + int maxMessage; + int numOfPartitions; + int maxMessagesInMemory; + } + + private SortedMap<IntWritable, Collection<IntWritable>> createRandomMessages( + TestData testData) { + SortedMap<IntWritable, Collection<IntWritable>> allMessages = + new TreeMap<IntWritable, Collection<IntWritable>>(); + for (int v = 0; v < testData.numVertices; v++) { + int messageNum = (int) (Math.random() * testData.maxNumberOfMessages); + Collection<IntWritable> vertexMessages = Lists.newArrayList(); + for (int m = 0; m < messageNum; m++) { + vertexMessages.add( + new IntWritable((int) (Math.random() * testData.maxMessage))); + } + IntWritable vertexId = + new IntWritable((int) (Math.random() * testData.maxId)); + allMessages.put(vertexId, vertexMessages); + } + return allMessages; + } + + private void putNTimes + (MessageStore<IntWritable, IntWritable> messageStore, + Map<IntWritable, Collection<IntWritable>> messages, + TestData testData) throws IOException { + for (int n = 0; n < testData.numTimes; n++) { + SortedMap<IntWritable, Collection<IntWritable>> batch = + createRandomMessages(testData); + messageStore.addMessages(batch); + for (Entry<IntWritable, Collection<IntWritable>> entry : + batch.entrySet()) { + if (messages.containsKey(entry.getKey())) { + messages.get(entry.getKey()).addAll(entry.getValue()); + } else { + messages.put(entry.getKey(), entry.getValue()); + } + } + } + } + + private <I extends WritableComparable, M extends Writable> boolean + equalMessages( + MessageStore<I, M> messageStore, + Map<I, Collection<M>> expectedMessages) throws IOException { + TreeSet<I> vertexIds = Sets.newTreeSet(); + Iterables.addAll(vertexIds, messageStore.getDestinationVertices()); + for (I vertexId : vertexIds) { + Collection<M> expected = expectedMessages.get(vertexId); + if (expected == null) { + return false; + } + Collection<M> actual = messageStore.getVertexMessages(vertexId); + if (!CollectionUtils.isEqualCollection(expected, actual)) { + return false; + } + } + return true; + } + + private <S extends MessageStore<IntWritable, IntWritable>> S doCheckpoint( + MessageStoreFactory<IntWritable, IntWritable, S> messageStoreFactory, + S messageStore) throws IOException { + File file = new File(directory + "messageStoreTest"); + if (file.exists()) { + file.delete(); + } + file.createNewFile(); + DataOutputStream out = new DataOutputStream(new BufferedOutputStream( + (new FileOutputStream(file)))); + messageStore.write(out); + out.close(); + + messageStore = messageStoreFactory.newStore(); + + DataInputStream in = new DataInputStream(new BufferedInputStream( + (new FileInputStream(file)))); + messageStore.readFields(in); + in.close(); + file.delete(); + + return messageStore; + } + + private <S extends MessageStore<IntWritable, IntWritable>> void + testMessageStore( + MessageStoreFactory<IntWritable, IntWritable, S> messageStoreFactory, + TestData testData) throws IOException { + SortedMap<IntWritable, Collection<IntWritable>> messages = + new TreeMap<IntWritable, Collection<IntWritable>>(); + S messageStore = messageStoreFactory.newStore(); + putNTimes(messageStore, messages, testData); + assertTrue(equalMessages(messageStore, messages)); + messageStore.clearAll(); + messageStore = doCheckpoint(messageStoreFactory, messageStore); + assertTrue(equalMessages(messageStore, messages)); + messageStore.clearAll(); + } + + @Test + public void testSimpleMessageStore() { + try { + testMessageStore(SimpleMessageStore.newFactory(service, config), + testData); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Test + public void testDiskBackedMessageStore() { + try { + MessageStoreFactory<IntWritable, IntWritable, + BasicMessageStore<IntWritable, IntWritable>> fileStoreFactory = + SequentialFileMessageStore.newFactory(config); + MessageStoreFactory<IntWritable, IntWritable, + FlushableMessageStore<IntWritable, IntWritable>> diskStoreFactory = + DiskBackedMessageStore.newFactory(config, fileStoreFactory); + testMessageStore(diskStoreFactory, testData); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Test + public void testDiskBackedMessageStoreByPartition() { + try { + MessageStoreFactory<IntWritable, IntWritable, + BasicMessageStore<IntWritable, IntWritable>> fileStoreFactory = + SequentialFileMessageStore.newFactory(config); + MessageStoreFactory<IntWritable, IntWritable, + FlushableMessageStore<IntWritable, IntWritable>> diskStoreFactory = + DiskBackedMessageStore.newFactory(config, fileStoreFactory); + testMessageStore(DiskBackedMessageStoreByPartition.newFactory(service, + testData.maxMessagesInMemory, diskStoreFactory), testData); + } catch (IOException e) { + e.printStackTrace(); + } + } +} Modified: giraph/trunk/src/test/java/org/apache/giraph/utils/MockUtils.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/utils/MockUtils.java?rev=1369508&r1=1369507&r2=1369508&view=diff ============================================================================== --- giraph/trunk/src/test/java/org/apache/giraph/utils/MockUtils.java (original) +++ giraph/trunk/src/test/java/org/apache/giraph/utils/MockUtils.java Sun Aug 5 00:17:12 2012 @@ -18,14 +18,20 @@ package org.apache.giraph.utils; +import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.comm.WorkerClientServer; -import org.apache.giraph.graph.GraphState; import org.apache.giraph.graph.Vertex; +import org.apache.giraph.graph.GraphState; +import org.apache.giraph.graph.partition.BasicPartitionOwner; +import org.apache.giraph.graph.partition.PartitionOwner; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; /** simplify mocking for unit testing vertices */ public class MockUtils { @@ -128,4 +134,22 @@ public class MockUtils { return env; } + + public static CentralizedServiceWorker<IntWritable, IntWritable, + IntWritable, IntWritable> mockServiceGetVertexPartitionOwner(final int + numOfPartitions) { + CentralizedServiceWorker<IntWritable, IntWritable, IntWritable, + IntWritable> service = Mockito.mock(CentralizedServiceWorker.class); + Answer<PartitionOwner> answer = new Answer<PartitionOwner>() { + @Override + public PartitionOwner answer(InvocationOnMock invocation) throws + Throwable { + IntWritable vertexId = (IntWritable) invocation.getArguments()[0]; + return new BasicPartitionOwner(vertexId.get() % numOfPartitions, null); + } + }; + Mockito.when(service.getVertexPartitionOwner( + Mockito.any(IntWritable.class))).thenAnswer(answer); + return service; + } }
