Repository: giraph Updated Branches: refs/heads/trunk 6256a761d -> c94dd9c74
http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java index 1cd1bd6..c18c686 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java @@ -218,16 +218,11 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable, vertexIdMessages.initialize(); for (I vertexId : messageStore.getPartitionDestinationVertices(partitionId)) { - try { - // Messages cannot be re-used from this iterable, but add() - // serializes the message, making this safe - Iterable<Writable> messages = messageStore.getVertexMessages(vertexId); - for (Writable message : messages) { - vertexIdMessages.add(vertexId, message); - } - } catch (IOException e) { - throw new IllegalStateException( - "sendPartitionMessages: Got IOException ", e); + // Messages cannot be re-used from this iterable, but add() + // serializes the message, making this safe + Iterable<Writable> messages = messageStore.getVertexMessages(vertexId); + for (Writable message : messages) { + vertexIdMessages.add(vertexId, message); } if (vertexIdMessages.getSize() > maxMessagesSizePerWorker) { WritableRequest messagesRequest = @@ -247,13 +242,7 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable, partitionId, vertexIdMessages); doRequest(workerInfo, messagesRequest); } - try { - messageStore.clearPartition(partitionId); - } catch (IOException e) { - throw new IllegalStateException( - "sendPartitionMessages: Got IOException while removing messages " + - "for partition " + partitionId + " :" + e); - } + messageStore.clearPartition(partitionId); } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java index b59d0cf..3d4f76e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java @@ -85,12 +85,8 @@ public class SendPartitionCurrentMessagesRequest<I extends WritableComparable, @Override public void doRequest(ServerData<I, V, E> serverData) { - try { - serverData.<M>getCurrentMessageStore().addPartitionMessages(partitionId, - vertexIdMessageMap); - } catch (IOException e) { - throw new RuntimeException("doRequest: Got IOException ", e); - } + serverData.<M>getCurrentMessageStore().addPartitionMessages(partitionId, + vertexIdMessageMap); } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java index 6953998..1f09739 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java @@ -18,8 +18,6 @@ package org.apache.giraph.comm.requests; -import java.io.IOException; - import org.apache.giraph.comm.ServerData; import org.apache.giraph.utils.ByteArrayVertexIdMessages; import org.apache.giraph.utils.PairList; @@ -70,13 +68,9 @@ public class SendWorkerMessagesRequest<I extends WritableComparable, iterator = partitionVertexData.getIterator(); while (iterator.hasNext()) { iterator.next(); - try { - serverData.getIncomingMessageStore(). - addPartitionMessages(iterator.getCurrentFirst(), - iterator.getCurrentSecond()); - } catch (IOException e) { - throw new RuntimeException("doRequest: Got IOException ", e); - } + serverData.getIncomingMessageStore(). + addPartitionMessages(iterator.getCurrentFirst(), + iterator.getCurrentSecond()); } } } http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java index f8d0473..bdaa871 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java @@ -93,64 +93,60 @@ public class SendWorkerOneMessageToManyRequest<I extends WritableComparable, @Override public void doRequest(ServerData serverData) { - try { - MessageStore<I, M> messageStore = serverData.getIncomingMessageStore(); - if (messageStore.isPointerListEncoding()) { - // if message store is pointer list based then send data as is - messageStore.addPartitionMessages(-1, oneMessageToManyIds); - } else { // else split the data per partition and send individually - CentralizedServiceWorker<I, ?, ?> serviceWorker = - serverData.getServiceWorker(); - // Get the initial size of ByteArrayVertexIdMessages per partition - // on this worker. To make sure every ByteArrayVertexIdMessages to have - // enough space to store the messages, we divide the original - // ByteArrayOneMessageToManyIds message size by the number of partitions - // and double the size - // (Assume the major component in ByteArrayOneMessageToManyIds message - // is a id list. Now each target id has a copy of message, - // therefore we double the buffer size) - // to get the initial size of ByteArrayVertexIdMessages. - int initialSize = oneMessageToManyIds.getSize() / - serverData.getPartitionStore().getNumPartitions() * 2; - // Create ByteArrayVertexIdMessages for - // message reformatting. - Int2ObjectOpenHashMap<ByteArrayVertexIdMessages> partitionIdMsgs = - new Int2ObjectOpenHashMap<>(); + MessageStore<I, M> messageStore = serverData.getIncomingMessageStore(); + if (messageStore.isPointerListEncoding()) { + // if message store is pointer list based then send data as is + messageStore.addPartitionMessages(-1, oneMessageToManyIds); + } else { // else split the data per partition and send individually + CentralizedServiceWorker<I, ?, ?> serviceWorker = + serverData.getServiceWorker(); + // Get the initial size of ByteArrayVertexIdMessages per partition + // on this worker. To make sure every ByteArrayVertexIdMessages to have + // enough space to store the messages, we divide the original + // ByteArrayOneMessageToManyIds message size by the number of partitions + // and double the size + // (Assume the major component in ByteArrayOneMessageToManyIds message + // is a id list. Now each target id has a copy of message, + // therefore we double the buffer size) + // to get the initial size of ByteArrayVertexIdMessages. + int initialSize = oneMessageToManyIds.getSize() / + serverData.getPartitionStore().getNumPartitions() * 2; + // Create ByteArrayVertexIdMessages for + // message reformatting. + Int2ObjectOpenHashMap<ByteArrayVertexIdMessages> partitionIdMsgs = + new Int2ObjectOpenHashMap<>(); - // Put data from ByteArrayOneMessageToManyIds - // to ByteArrayVertexIdMessages - VertexIdMessageIterator<I, M> vertexIdMessageIterator = - oneMessageToManyIds.getVertexIdMessageIterator(); - while (vertexIdMessageIterator.hasNext()) { - vertexIdMessageIterator.next(); - M msg = vertexIdMessageIterator.getCurrentMessage(); - I vertexId = vertexIdMessageIterator.getCurrentVertexId(); - PartitionOwner owner = - serviceWorker.getVertexPartitionOwner(vertexId); - int partitionId = owner.getPartitionId(); - ByteArrayVertexIdMessages<I, M> idMsgs = partitionIdMsgs - .get(partitionId); - if (idMsgs == null) { - idMsgs = new ByteArrayVertexIdMessages<>( - getConf().<M>createOutgoingMessageValueFactory()); - idMsgs.setConf(getConf()); - idMsgs.initialize(initialSize); - partitionIdMsgs.put(partitionId, idMsgs); - } - idMsgs.add(vertexId, msg); + // Put data from ByteArrayOneMessageToManyIds + // to ByteArrayVertexIdMessages + VertexIdMessageIterator<I, M> vertexIdMessageIterator = + oneMessageToManyIds.getVertexIdMessageIterator(); + while (vertexIdMessageIterator.hasNext()) { + vertexIdMessageIterator.next(); + M msg = vertexIdMessageIterator.getCurrentMessage(); + I vertexId = vertexIdMessageIterator.getCurrentVertexId(); + PartitionOwner owner = + serviceWorker.getVertexPartitionOwner(vertexId); + int partitionId = owner.getPartitionId(); + ByteArrayVertexIdMessages<I, M> idMsgs = partitionIdMsgs + .get(partitionId); + if (idMsgs == null) { + idMsgs = new ByteArrayVertexIdMessages<>( + getConf().<M>createOutgoingMessageValueFactory()); + idMsgs.setConf(getConf()); + idMsgs.initialize(initialSize); + partitionIdMsgs.put(partitionId, idMsgs); } + idMsgs.add(vertexId, msg); + } - // Read ByteArrayVertexIdMessages and write to message store - for (Entry<Integer, ByteArrayVertexIdMessages> idMsgs : - partitionIdMsgs.entrySet()) { - if (!idMsgs.getValue().isEmpty()) { - serverData.getIncomingMessageStore().addPartitionMessages( - idMsgs.getKey(), idMsgs.getValue()); - } + // Read ByteArrayVertexIdMessages and write to message store + for (Entry<Integer, ByteArrayVertexIdMessages> idMsgs : + partitionIdMsgs.entrySet()) { + if (!idMsgs.getValue().isEmpty()) { + serverData.getIncomingMessageStore().addPartitionMessages( + idMsgs.getKey(), idMsgs.getValue()); } } - } catch (IOException e) { - throw new IllegalStateException("doRequest: Got IOException ", e); } } } http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index 17e030f..da0a453 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -962,28 +962,6 @@ public interface GiraphConstants { "This directory has/stores the available checkpoint files in HDFS."); /** - * Comma-separated list of directories in the local file system for - * out-of-core messages. - */ - StrConfOption MESSAGES_DIRECTORY = - new StrConfOption("giraph.messagesDirectory", "_bsp/_messages/", - "Comma-separated list of directories in the local file system for " + - "out-of-core messages."); - - /** - * If using out-of-core messaging, it tells how much messages do we keep - * in memory. - */ - IntConfOption MAX_MESSAGES_IN_MEMORY = - new IntConfOption("giraph.maxMessagesInMemory", 1000000, - "If using out-of-core messaging, it tells how much messages do we " + - "keep in memory."); - /** Size of buffer when reading and writing messages out-of-core. */ - IntConfOption MESSAGES_BUFFER_SIZE = - new IntConfOption("giraph.messagesBufferSize", 8 * ONE_KB, - "Size of buffer when reading and writing messages out-of-core."); - - /** * Comma-separated list of directories in the local filesystem for * out-of-core partitions. */ http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java index 9e56d99..94ba83a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java @@ -106,17 +106,17 @@ public class DiskBackedMessageStore<I extends WritableComparable, } @Override - public Iterable<M> getVertexMessages(I vertexId) throws IOException { + public Iterable<M> getVertexMessages(I vertexId) { return messageStore.getVertexMessages(vertexId); } @Override - public void clearVertexMessages(I vertexId) throws IOException { + public void clearVertexMessages(I vertexId) { messageStore.clearVertexMessages(vertexId); } @Override - public void clearAll() throws IOException { + public void clearAll() { messageStore.clearAll(); } @@ -132,7 +132,7 @@ public class DiskBackedMessageStore<I extends WritableComparable, @Override public void addPartitionMessages( - int partitionId, VertexIdMessages<I, M> messages) throws IOException { + int partitionId, VertexIdMessages<I, M> messages) { if (useMessageCombiner) { messageStore.addPartitionMessages(partitionId, messages); } else { @@ -193,7 +193,7 @@ public class DiskBackedMessageStore<I extends WritableComparable, } @Override - public void clearPartition(int partitionId) throws IOException { + public void clearPartition(int partitionId) { messageStore.clearPartition(partitionId); } @@ -302,11 +302,6 @@ public class DiskBackedMessageStore<I extends WritableComparable, protected void addEntryToImMemoryPartitionData(int partitionId, VertexIdMessages<I, M> messages) { - try { - messageStore.addPartitionMessages(partitionId, messages); - } catch (IOException e) { - throw new IllegalStateException("Caught IOException while adding a new " + - "message to in-memory message store"); - } + messageStore.addPartitionMessages(partitionId, messages); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java index de2ffd4..fbe38ac 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java @@ -18,21 +18,16 @@ package org.apache.giraph.partition; -import static org.apache.giraph.conf.GiraphConstants.MESSAGE_STORE_FACTORY_CLASS; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Iterator; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListMap; import javax.annotation.concurrent.ThreadSafe; -import org.apache.giraph.comm.messages.MessageStoreFactory; import org.apache.giraph.edge.Edge; import org.apache.giraph.graph.Vertex; -import org.apache.giraph.utils.ReflectionUtils; import org.apache.giraph.utils.WritableUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -64,11 +59,7 @@ public class SimplePartition<I extends WritableComparable, @Override public void initialize(int partitionId, Progressable progressable) { super.initialize(partitionId, progressable); - if (shouldTraverseMessageInOrder()) { - vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E>>(); - } else { - vertexMap = Maps.newConcurrentMap(); - } + vertexMap = Maps.newConcurrentMap(); } @Override @@ -145,11 +136,7 @@ public class SimplePartition<I extends WritableComparable, @Override public void readFields(DataInput input) throws IOException { super.readFields(input); - if (shouldTraverseMessageInOrder()) { - vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E>>(); - } else { - vertexMap = Maps.newConcurrentMap(); - } + vertexMap = Maps.newConcurrentMap(); int vertices = input.readInt(); for (int i = 0; i < vertices; ++i) { progress(); @@ -177,20 +164,4 @@ public class SimplePartition<I extends WritableComparable, public Iterator<Vertex<I, V, E>> iterator() { return vertexMap.values().iterator(); } - - /** - * This method specifies if the message store factory, that is been - * configured, has requirement of traversing messages in order. - * - * @return true if the message store factory has specified traversing - * messages in ordered, else return false. - */ - private boolean shouldTraverseMessageInOrder() { - Class<? extends MessageStoreFactory> messageStoreFactoryClass = - MESSAGE_STORE_FACTORY_CLASS.get(getConf()); - - MessageStoreFactory messageStoreFactoryInstance = - ReflectionUtils.newInstance(messageStoreFactoryClass); - return messageStoreFactoryInstance.shouldTraverseMessagesInOrder(); - } } http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java index 22594fc..37f0c4b 100644 --- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java +++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java @@ -103,7 +103,7 @@ public class RequestFailureTest { return request; } - private void checkResult(int numRequests) throws IOException { + private void checkResult(int numRequests) { // Check the output Iterable<IntWritable> vertices = serverData.getIncomingMessageStore().getPartitionDestinationVertices(0); http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java index 2d86aee..5db0b79 100644 --- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java +++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java @@ -80,7 +80,7 @@ public class RequestTest { private WorkerInfo workerInfo; @Before - public void setUp() throws IOException { + public void setUp() { // Setup the conf GiraphConfiguration tmpConf = new GiraphConfiguration(); GiraphConstants.COMPUTATION_CLASS.set(tmpConf, IntNoOpComputation.class); @@ -108,7 +108,7 @@ public class RequestTest { } @Test - public void sendVertexPartition() throws IOException { + public void sendVertexPartition() { // Data to send int partitionId = 13; Partition<IntWritable, IntWritable, IntWritable> partition = @@ -145,7 +145,7 @@ public class RequestTest { } @Test - public void sendWorkerMessagesRequest() throws IOException { + public void sendWorkerMessagesRequest() { // Data to send PairList<Integer, VertexIdMessages<IntWritable, IntWritable>> @@ -244,7 +244,7 @@ public class RequestTest { } @Test - public void sendPartitionMutationsRequest() throws IOException { + public void sendPartitionMutationsRequest() { // Data to send int partitionId = 19; Map<IntWritable, VertexMutations<IntWritable, IntWritable, http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java b/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java index e9f5f92..771a37c 100644 --- a/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java +++ b/giraph-core/src/test/java/org/apache/giraph/comm/TestMessageStores.java @@ -36,18 +36,13 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; -import org.apache.commons.io.FileUtils; import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.comm.messages.ByteArrayMessagesPerVertexStore; import org.apache.giraph.comm.messages.MessageEncodeAndStoreType; import org.apache.giraph.comm.messages.MessageStore; import org.apache.giraph.comm.messages.MessageStoreFactory; -import org.apache.giraph.comm.messages.out_of_core.DiskBackedMessageStore; -import org.apache.giraph.comm.messages.out_of_core.PartitionDiskBackedMessageStore; -import org.apache.giraph.comm.messages.out_of_core.SequentialFileMessageStore; import org.apache.giraph.conf.DefaultMessageClasses; import org.apache.giraph.conf.GiraphConfiguration; -import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.factories.DefaultMessageValueFactory; import org.apache.giraph.factories.TestMessageValueFactory; @@ -83,14 +78,10 @@ public class TestMessageStores { private static final Random RANDOM = new Random(101); @Before - public void prepare() throws IOException { - directory = Files.createTempDir(); - + public void prepare() { Configuration.addDefaultResource("giraph-site.xml"); GiraphConfiguration initConfig = new GiraphConfiguration(); initConfig.setComputationClass(IntNoOpComputation.class); - GiraphConstants.MESSAGES_DIRECTORY.set( - initConfig, new File(directory, "giraph_messages").toString()); config = new ImmutableClassesGiraphConfiguration<IntWritable, IntWritable, IntWritable>(initConfig); @@ -108,8 +99,7 @@ public class TestMessageStores { } @After - public void cleanUp() throws IOException { - FileUtils.deleteDirectory(directory); + public void cleanUp() { } private static class TestData { @@ -145,7 +135,7 @@ public class TestMessageStores { MessageStore<IntWritable, IntWritable> messageStore, CentralizedServiceWorker<IntWritable, ?, ?> service, ImmutableClassesGiraphConfiguration<IntWritable, ?, ?> config, - Map<IntWritable, Collection<IntWritable>> inputMap) throws IOException { + Map<IntWritable, Collection<IntWritable>> inputMap) { for (Map.Entry<IntWritable, Collection<IntWritable>> entry : inputMap.entrySet()) { int partitionId = @@ -166,7 +156,7 @@ public class TestMessageStores { private void putNTimes( MessageStore<IntWritable, IntWritable> messageStore, Map<IntWritable, Collection<IntWritable>> messages, - TestData testData) throws IOException { + TestData testData) { for (int n = 0; n < testData.numTimes; n++) { SortedMap<IntWritable, Collection<IntWritable>> batch = createRandomMessages(testData); @@ -186,7 +176,7 @@ public class TestMessageStores { equalMessages( MessageStore<I, M> messageStore, Map<I, Collection<M>> expectedMessages, - TestData testData) throws IOException { + TestData testData) { for (int partitionId = 0; partitionId < testData.numOfPartitions; partitionId++) { TreeSet<I> vertexIds = Sets.newTreeSet(); @@ -274,22 +264,4 @@ public class TestMessageStores { e.printStackTrace(); } } - - @Test - public void testDiskBackedMessageStoreByPartition() { - try { - MessageStoreFactory<IntWritable, IntWritable, - SequentialFileMessageStore<IntWritable, IntWritable>> - fileStoreFactory = - SequentialFileMessageStore.newFactory(config); - MessageStoreFactory<IntWritable, IntWritable, - PartitionDiskBackedMessageStore<IntWritable, IntWritable>> - partitionStoreFactory = - PartitionDiskBackedMessageStore.newFactory(config, fileStoreFactory); - testMessageStore(DiskBackedMessageStore.newFactory(service, - testData.maxMessagesInMemory, partitionStoreFactory), testData); - } catch (IOException e) { - e.printStackTrace(); - } - } } http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java index 611b021..e3b2db0 100644 --- a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java +++ b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java @@ -57,7 +57,7 @@ public class TestIntFloatPrimitiveMessageStores { Writable> conf; @Before - public void prepare() throws IOException { + public void prepare() { service = Mockito.mock(CentralizedServiceWorker.class); Mockito.when( service.getPartitionId(Mockito.any(IntWritable.class))).thenAnswer( @@ -103,8 +103,7 @@ public class TestIntFloatPrimitiveMessageStores { } private static void insertIntFloatMessages( - MessageStore<IntWritable, FloatWritable> messageStore) throws - IOException { + MessageStore<IntWritable, FloatWritable> messageStore) { ByteArrayVertexIdMessages<IntWritable, FloatWritable> messages = createIntFloatMessages(); messages.add(new IntWritable(0), new FloatWritable(1)); @@ -122,7 +121,7 @@ public class TestIntFloatPrimitiveMessageStores { } @Test - public void testIntFloatMessageStore() throws IOException { + public void testIntFloatMessageStore() { IntFloatMessageStore messageStore = new IntFloatMessageStore(service, new FloatSumMessageCombiner()); insertIntFloatMessages(messageStore); @@ -144,7 +143,7 @@ public class TestIntFloatPrimitiveMessageStores { } @Test - public void testIntByteArrayMessageStore() throws IOException { + public void testIntByteArrayMessageStore() { IntByteArrayMessageStore<FloatWritable> messageStore = new IntByteArrayMessageStore<FloatWritable>(new TestMessageValueFactory<FloatWritable>(FloatWritable.class), @@ -174,8 +173,7 @@ public class TestIntFloatPrimitiveMessageStores { } @Test - public void testIntByteArrayMessageStoreWithMessageEncoding() throws - IOException { + public void testIntByteArrayMessageStoreWithMessageEncoding() { GiraphConstants.USE_MESSAGE_SIZE_ENCODING.set(conf, true); testIntByteArrayMessageStore(); GiraphConstants.USE_MESSAGE_SIZE_ENCODING.set(conf, false); http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java index 2027628..dc9850b 100644 --- a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java +++ b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java @@ -54,7 +54,7 @@ public class TestLongDoublePrimitiveMessageStores { service; @Before - public void prepare() throws IOException { + public void prepare() { service = Mockito.mock(CentralizedServiceWorker.class); Mockito.when( service.getPartitionId(Mockito.any(LongWritable.class))).thenAnswer( @@ -104,8 +104,7 @@ public class TestLongDoublePrimitiveMessageStores { } private static void insertLongDoubleMessages( - MessageStore<LongWritable, DoubleWritable> messageStore) throws - IOException { + MessageStore<LongWritable, DoubleWritable> messageStore) { ByteArrayVertexIdMessages<LongWritable, DoubleWritable> messages = createLongDoubleMessages(); messages.add(new LongWritable(0), new DoubleWritable(1)); @@ -123,7 +122,7 @@ public class TestLongDoublePrimitiveMessageStores { } @Test - public void testLongDoubleMessageStore() throws IOException { + public void testLongDoubleMessageStore() { LongDoubleMessageStore messageStore = new LongDoubleMessageStore(service, new DoubleSumMessageCombiner()); insertLongDoubleMessages(messageStore); @@ -145,7 +144,7 @@ public class TestLongDoublePrimitiveMessageStores { } @Test - public void testLongByteArrayMessageStore() throws IOException { + public void testLongByteArrayMessageStore() { LongByteArrayMessageStore<DoubleWritable> messageStore = new LongByteArrayMessageStore<DoubleWritable>( new TestMessageValueFactory<DoubleWritable>(DoubleWritable.class), http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java index 75edb09..ffc1288 100644 --- a/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java +++ b/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java @@ -41,7 +41,7 @@ public class AsyncMessageStoreWrapperTest { @Test - public void testAsyncQueue() throws IOException { + public void testAsyncQueue() { TestMessageStore store = new TestMessageStore(); AsyncMessageStoreWrapper<LongWritable, IntWritable> queue = @@ -65,7 +65,7 @@ public class AsyncMessageStoreWrapperTest { private int counters[] = new int[5]; @Override - public void addPartitionMessages(int partition, VertexIdMessages messages) throws IOException { + public void addPartitionMessages(int partition, VertexIdMessages messages) { assertNotNull(messages); counters[partition]++; } @@ -76,17 +76,17 @@ public class AsyncMessageStoreWrapperTest { } @Override - public Iterable<IntWritable> getVertexMessages(LongWritable vertexId) throws IOException { + public Iterable<IntWritable> getVertexMessages(LongWritable vertexId) { return null; } @Override - public void clearVertexMessages(LongWritable vertexId) throws IOException { + public void clearVertexMessages(LongWritable vertexId) { } @Override - public void clearAll() throws IOException { + public void clearAll() { } @@ -111,7 +111,7 @@ public class AsyncMessageStoreWrapperTest { } @Override - public void clearPartition(int partitionId) throws IOException { + public void clearPartition(int partitionId) { } http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java b/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java index 01c2613..e3d396c 100644 --- a/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java +++ b/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java @@ -19,8 +19,6 @@ package org.apache.giraph.graph; import com.google.common.collect.Lists; -import org.apache.giraph.comm.messages.MessageStoreFactory; -import org.apache.giraph.comm.messages.out_of_core.DiskBackedMessageStoreFactory; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.edge.ArrayListEdges; http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/test/java/org/apache/giraph/jython/TestJythonComputation.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/jython/TestJythonComputation.java b/giraph-core/src/test/java/org/apache/giraph/jython/TestJythonComputation.java index 49a338c..79fcd3c 100644 --- a/giraph-core/src/test/java/org/apache/giraph/jython/TestJythonComputation.java +++ b/giraph-core/src/test/java/org/apache/giraph/jython/TestJythonComputation.java @@ -19,7 +19,6 @@ package org.apache.giraph.jython; import org.apache.giraph.comm.messages.InMemoryMessageStoreFactory; import org.apache.giraph.comm.messages.MessageStoreFactory; -import org.apache.giraph.comm.messages.out_of_core.DiskBackedMessageStoreFactory; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.GiraphTypes; @@ -43,11 +42,6 @@ import static org.junit.Assert.assertEquals; public class TestJythonComputation { @Test - public void testCountEdgesDiskBackedMessageStoreFactory() throws Exception { - testCountEdges(DiskBackedMessageStoreFactory.class); - } - - @Test public void testCountEdgesInMemoryMessageStoreFactory() throws Exception { testCountEdges(InMemoryMessageStoreFactory.class); }
