Cleanup the old out-of-core message mechanism Summary: With the new out-of-core infrastructure, there is no need for the old version of message out-of-core. The old version of message out-of-core also interferes with the new mechanism. It seems that the old out-of-core message mechanism is not necessary anymore. This diff removes the old out-of-core messages and cleans up its implications on the rest of the code base.
Test Plan: mvn clean verify snapshot tests passes Reviewers: maja.kabiljo, dionysis.logothetis, sergey.edunov Differential Revision: https://reviews.facebook.net/D58701 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/c94dd9c7 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/c94dd9c7 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/c94dd9c7 Branch: refs/heads/trunk Commit: c94dd9c74895d419f72974fcbe5456fbae84b7a9 Parents: 6256a76 Author: Hassan Eslami <[email protected]> Authored: Tue May 31 10:37:00 2016 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Tue May 31 10:37:00 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/giraph/comm/ServerData.java | 20 +- .../messages/AbstractListPerVertexStore.java | 3 +- .../ByteArrayMessagesPerVertexStore.java | 32 +- .../messages/InMemoryMessageStoreFactory.java | 5 - .../giraph/comm/messages/MessageStore.java | 17 +- .../comm/messages/MessageStoreFactory.java | 10 - .../comm/messages/OneMessagePerVertexStore.java | 7 +- .../messages/PointerListPerVertexStore.java | 43 +- .../comm/messages/SimpleMessageStore.java | 8 +- .../out_of_core/DiskBackedMessageStore.java | 305 ------------- .../DiskBackedMessageStoreFactory.java | 97 ---- .../PartitionDiskBackedMessageStore.java | 370 ---------------- .../out_of_core/SequentialFileMessageStore.java | 437 ------------------- .../comm/messages/out_of_core/package-info.java | 21 - .../primitives/IdByteArrayMessageStore.java | 33 +- .../primitives/IdOneMessagePerVertexStore.java | 11 +- .../primitives/IntByteArrayMessageStore.java | 32 +- .../primitives/IntFloatMessageStore.java | 11 +- .../primitives/LongDoubleMessageStore.java | 11 +- .../long_id/LongAbstractMessageStore.java | 7 +- .../long_id/LongByteArrayMessageStore.java | 25 +- .../long_id/LongPointerListMessageStore.java | 45 +- .../queue/AsyncMessageStoreWrapper.java | 12 +- .../NettyWorkerClientRequestProcessor.java | 23 +- .../SendPartitionCurrentMessagesRequest.java | 8 +- .../requests/SendWorkerMessagesRequest.java | 12 +- .../SendWorkerOneMessageToManyRequest.java | 102 +++-- .../org/apache/giraph/conf/GiraphConstants.java | 22 - .../giraph/ooc/data/DiskBackedMessageStore.java | 17 +- .../giraph/partition/SimplePartition.java | 33 +- .../apache/giraph/comm/RequestFailureTest.java | 2 +- .../org/apache/giraph/comm/RequestTest.java | 8 +- .../apache/giraph/comm/TestMessageStores.java | 38 +- .../TestIntFloatPrimitiveMessageStores.java | 12 +- .../TestLongDoublePrimitiveMessageStores.java | 9 +- .../queue/AsyncMessageStoreWrapperTest.java | 12 +- .../apache/giraph/graph/TestVertexAndEdges.java | 2 - .../giraph/jython/TestJythonComputation.java | 6 - 38 files changed, 248 insertions(+), 1620 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java index 69fbfee..4156d8c 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java @@ -18,7 +18,6 @@ package org.apache.giraph.comm; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -234,10 +233,8 @@ public class ServerData<I extends WritableComparable, /** * Re-initialize message stores. * Discards old values if any. - * - * @throws IOException */ - public void resetMessageStores() throws IOException { + public void resetMessageStores() { if (currentMessageStore != null) { currentMessageStore.clearAll(); currentMessageStore = null; @@ -252,12 +249,7 @@ public class ServerData<I extends WritableComparable, /** Prepare for next superstep */ public void prepareSuperstep() { if (currentMessageStore != null) { - try { - currentMessageStore.clearAll(); - } catch (IOException e) { - throw new IllegalStateException( - "Failed to clear previous message store"); - } + currentMessageStore.clearAll(); } MessageStore<I, Writable> nextCurrentMessageStore; @@ -422,13 +414,7 @@ public class ServerData<I extends WritableComparable, partition.putVertex(vertex); } else if (originalVertex != null) { partition.removeVertex(vertexId); - try { - getCurrentMessageStore().clearVertexMessages(vertexId); - } catch (IOException e) { - throw new IllegalStateException("resolvePartitionMutations: " + - "Caught IOException while clearing messages for a deleted " + - "vertex due to a mutation"); - } + getCurrentMessageStore().clearVertexMessages(vertexId); } context.progress(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/AbstractListPerVertexStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/AbstractListPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/AbstractListPerVertexStore.java index 6840f86..c28dff5 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/AbstractListPerVertexStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/AbstractListPerVertexStore.java @@ -26,7 +26,6 @@ import org.apache.giraph.utils.VertexIdIterator; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.concurrent.ConcurrentMap; @@ -89,7 +88,7 @@ public abstract class AbstractListPerVertexStore<I extends WritableComparable, } @Override - public Iterable<M> getVertexMessages(I vertexId) throws IOException { + public Iterable<M> getVertexMessages(I vertexId) { ConcurrentMap<I, L> partitionMap = map.get(getPartitionId(vertexId)); if (partitionMap == null) { http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java index 29a0888..efbe11b 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java @@ -95,7 +95,7 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable, @Override public void addPartitionMessages( - int partitionId, VertexIdMessages<I, M> messages) throws IOException { + int partitionId, VertexIdMessages<I, M> messages) { ConcurrentMap<I, DataInputOutput> partitionMap = getOrCreatePartitionMap(partitionId); VertexIdMessageBytesIterator<I, M> vertexIdMessageBytesIterator = @@ -117,17 +117,22 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable, } } } else { - VertexIdMessageIterator<I, M> vertexIdMessageIterator = - messages.getVertexIdMessageIterator(); - while (vertexIdMessageIterator.hasNext()) { - vertexIdMessageIterator.next(); - DataInputOutput dataInputOutput = - getDataInputOutput(partitionMap, vertexIdMessageIterator); - - synchronized (dataInputOutput) { - VerboseByteStructMessageWrite.verboseWriteCurrentMessage( - vertexIdMessageIterator, dataInputOutput.getDataOutput()); + try { + VertexIdMessageIterator<I, M> vertexIdMessageIterator = + messages.getVertexIdMessageIterator(); + while (vertexIdMessageIterator.hasNext()) { + vertexIdMessageIterator.next(); + DataInputOutput dataInputOutput = + getDataInputOutput(partitionMap, vertexIdMessageIterator); + + synchronized (dataInputOutput) { + VerboseByteStructMessageWrite.verboseWriteCurrentMessage( + vertexIdMessageIterator, dataInputOutput.getDataOutput()); + } } + } catch (IOException e) { + throw new RuntimeException("addPartitionMessages: IOException while" + + " adding messages for a partition: " + e); } } } @@ -225,10 +230,5 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable, this.service = service; this.config = conf; } - - @Override - public boolean shouldTraverseMessagesInOrder() { - return false; - } } } http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java index 27980a9..99a12c5 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java @@ -206,9 +206,4 @@ public class InMemoryMessageStoreFactory<I extends WritableComparable, this.service = service; this.conf = conf; } - - @Override - public boolean shouldTraverseMessagesInOrder() { - return false; - } } http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java index 6e85ea3..9c56d85 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java @@ -48,24 +48,20 @@ public interface MessageStore<I extends WritableComparable, * * @param vertexId Vertex id for which we want to get messages * @return Iterable of messages for a vertex id - * @throws java.io.IOException */ - Iterable<M> getVertexMessages(I vertexId) throws IOException; + Iterable<M> getVertexMessages(I vertexId); /** * Clears messages for a vertex. * * @param vertexId Vertex id for which we want to clear messages - * @throws IOException */ - void clearVertexMessages(I vertexId) throws IOException; + void clearVertexMessages(I vertexId); /** * Clears all resources used by this store. - * - * @throws IOException */ - void clearAll() throws IOException; + void clearAll(); /** * Check if we have messages for some vertex @@ -88,11 +84,9 @@ public interface MessageStore<I extends WritableComparable, * * @param partitionId Id of partition * @param messages Collection of vertex ids and messages we want to add - * @throws IOException */ void addPartitionMessages( - int partitionId, VertexIdMessages<I, M> messages) - throws IOException; + int partitionId, VertexIdMessages<I, M> messages); /** * Called before start of computation in bspworker @@ -113,9 +107,8 @@ public interface MessageStore<I extends WritableComparable, * Clears messages for a partition. * * @param partitionId Partition id for which we want to clear messages - * @throws IOException */ - void clearPartition(int partitionId) throws IOException; + void clearPartition(int partitionId); /** * Serialize messages for one partition. http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java index 41076e3..6a18aa8 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java @@ -50,14 +50,4 @@ public interface MessageStoreFactory<I extends WritableComparable, */ void initialize(CentralizedServiceWorker<I, ?, ?> service, ImmutableClassesGiraphConfiguration<I, ?, ?> conf); - - /** - * This method is more for the performance optimization. If the message - * traversal would be done in order then data structure which is optimized - * for such traversal can be used. - * - * @return true if the messages would be traversed in order - * else return false - */ - boolean shouldTraverseMessagesInOrder(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java index ad0a5dc..1d67014 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java @@ -71,7 +71,7 @@ public class OneMessagePerVertexStore<I extends WritableComparable, @Override public void addPartitionMessages( int partitionId, - VertexIdMessages<I, M> messages) throws IOException { + VertexIdMessages<I, M> messages) { ConcurrentMap<I, M> partitionMap = getOrCreatePartitionMap(partitionId); VertexIdMessageIterator<I, M> vertexIdMessageIterator = @@ -175,10 +175,5 @@ public class OneMessagePerVertexStore<I extends WritableComparable, this.service = service; this.config = conf; } - - @Override - public boolean shouldTraverseMessagesInOrder() { - return false; - } } } http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListPerVertexStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListPerVertexStore.java index cce0439..4b32a17 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListPerVertexStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListPerVertexStore.java @@ -77,26 +77,31 @@ public class PointerListPerVertexStore<I extends WritableComparable, @Override public void addPartitionMessages( - int partitionId, VertexIdMessages<I, M> messages) throws IOException { - VertexIdMessageIterator<I, M> vertexIdMessageIterator = - messages.getVertexIdMessageIterator(); - long pointer = 0; - LongArrayList list; - while (vertexIdMessageIterator.hasNext()) { - vertexIdMessageIterator.next(); - M msg = vertexIdMessageIterator.getCurrentMessage(); - list = getOrCreateList(vertexIdMessageIterator); - if (vertexIdMessageIterator.isNewMessage()) { - IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut(); - pointer = indexAndDataOut.getIndex(); - pointer <<= 32; - ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput(); - pointer += dataOutput.getPos(); - msg.write(dataOutput); - } - synchronized (list) { - list.add(pointer); + int partitionId, VertexIdMessages<I, M> messages) { + try { + VertexIdMessageIterator<I, M> vertexIdMessageIterator = + messages.getVertexIdMessageIterator(); + long pointer = 0; + LongArrayList list; + while (vertexIdMessageIterator.hasNext()) { + vertexIdMessageIterator.next(); + M msg = vertexIdMessageIterator.getCurrentMessage(); + list = getOrCreateList(vertexIdMessageIterator); + if (vertexIdMessageIterator.isNewMessage()) { + IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut(); + pointer = indexAndDataOut.getIndex(); + pointer <<= 32; + ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput(); + pointer += dataOutput.getPos(); + msg.write(dataOutput); + } + synchronized (list) { + list.add(pointer); + } } + } catch (IOException e) { + throw new RuntimeException("addPartitionMessages: IOException while" + + " adding messages for a partition: " + e); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java index 054302d..9c3ef7f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java @@ -157,7 +157,7 @@ public abstract class SimpleMessageStore<I extends WritableComparable, } @Override - public Iterable<M> getVertexMessages(I vertexId) throws IOException { + public Iterable<M> getVertexMessages(I vertexId) { ConcurrentMap<I, T> partitionMap = map.get(getPartitionId(vertexId)); if (partitionMap == null) { return Collections.<M>emptyList(); @@ -197,7 +197,7 @@ public abstract class SimpleMessageStore<I extends WritableComparable, } @Override - public void clearVertexMessages(I vertexId) throws IOException { + public void clearVertexMessages(I vertexId) { ConcurrentMap<I, ?> partitionMap = map.get(getPartitionId(vertexId)); if (partitionMap != null) { @@ -206,7 +206,7 @@ public abstract class SimpleMessageStore<I extends WritableComparable, } @Override - public void clearPartition(int partitionId) throws IOException { + public void clearPartition(int partitionId) { map.remove(partitionId); } @@ -217,7 +217,7 @@ public abstract class SimpleMessageStore<I extends WritableComparable, } @Override - public void clearAll() throws IOException { + public void clearAll() { map.clear(); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java deleted file mode 100644 index 0d7009b..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java +++ /dev/null @@ -1,305 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.comm.messages.out_of_core; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.Collections; -import java.util.concurrent.ConcurrentMap; - -import org.apache.giraph.bsp.CentralizedServiceWorker; -import org.apache.giraph.comm.messages.MessageStore; -import org.apache.giraph.comm.messages.MessageStoreFactory; -import org.apache.giraph.conf.MessageClasses; -import org.apache.giraph.utils.EmptyIterable; -import org.apache.giraph.utils.VertexIdMessageIterator; -import org.apache.giraph.utils.VertexIdMessages; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; - -/** - * Message store which separates data by partitions, - * and submits them to underlying message store. - * - * @param <I> Vertex id - * @param <V> Vertex data - * @param <E> Edge data - * @param <M> Message data - */ -public class DiskBackedMessageStore<I extends WritableComparable, - V extends Writable, E extends Writable, M extends Writable> implements - MessageStore<I, M> { - /** Message value factory */ - private final MessageClasses<I, M> messageClasses; - /** Service worker */ - private final CentralizedServiceWorker<I, V, E> service; - /** Number of messages to keep in memory */ - private final int maxNumberOfMessagesInMemory; - /** Factory for creating file stores when flushing */ - private final MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I, M>> - partitionStoreFactory; - /** Map from partition id to its message store */ - private final ConcurrentMap<Integer, PartitionDiskBackedMessageStore<I, M>> - partitionMessageStores; - - /** - * Constructor - * - * @param messageClasses Message classes information - * @param service Service worker - * @param maxNumberOfMessagesInMemory Number of messages to keep in memory - * @param partitionStoreFactory Factory for creating stores for a - * partition - */ - public DiskBackedMessageStore( - MessageClasses<I, M> messageClasses, - CentralizedServiceWorker<I, V, E> service, - int maxNumberOfMessagesInMemory, - MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I, - M>> partitionStoreFactory) { - this.messageClasses = messageClasses; - this.service = service; - this.maxNumberOfMessagesInMemory = maxNumberOfMessagesInMemory; - this.partitionStoreFactory = partitionStoreFactory; - partitionMessageStores = Maps.newConcurrentMap(); - } - - @Override - public boolean isPointerListEncoding() { - return false; - } - - @Override - public void addPartitionMessages( - int partitionId, - VertexIdMessages<I, M> messages) throws IOException { - PartitionDiskBackedMessageStore<I, M> partitionMessageStore = - getMessageStore(partitionId); - VertexIdMessageIterator<I, M> - vertexIdMessageIterator = - messages.getVertexIdMessageIterator(); - while (vertexIdMessageIterator.hasNext()) { - vertexIdMessageIterator.next(); - boolean ownsVertexId = - partitionMessageStore.addVertexMessages( - vertexIdMessageIterator.getCurrentVertexId(), - Collections.singleton( - vertexIdMessageIterator.getCurrentMessage())); - if (ownsVertexId) { - vertexIdMessageIterator.releaseCurrentVertexId(); - } - } - checkMemory(); - } - - @Override - public void finalizeStore() { - } - - @Override - public Iterable<M> getVertexMessages(I vertexId) throws IOException { - if (hasMessagesForVertex(vertexId)) { - return getMessageStore(vertexId).getVertexMessages(vertexId); - } else { - return EmptyIterable.get(); - } - } - - @Override - public boolean hasMessagesForVertex(I vertexId) { - return getMessageStore(vertexId).hasMessagesForVertex(vertexId); - } - - @Override - public boolean hasMessagesForPartition(int partitionId) { - PartitionDiskBackedMessageStore<I, M> partitionMessages = - getMessageStore(partitionId); - return partitionMessages != null && !Iterables - .isEmpty(partitionMessages.getDestinationVertices()); - } - - @Override - public Iterable<I> getPartitionDestinationVertices(int partitionId) { - PartitionDiskBackedMessageStore<I, M> messageStore = - partitionMessageStores.get(partitionId); - if (messageStore == null) { - return Collections.emptyList(); - } else { - return messageStore.getDestinationVertices(); - } - } - - @Override - public void clearVertexMessages(I vertexId) throws IOException { - if (hasMessagesForVertex(vertexId)) { - getMessageStore(vertexId).clearVertexMessages(vertexId); - } - } - - @Override - public void clearPartition(int partitionId) throws IOException { - PartitionDiskBackedMessageStore<I, M> messageStore = - partitionMessageStores.get(partitionId); - if (messageStore != null) { - messageStore.clearAll(); - } - } - - @Override - public void clearAll() throws IOException { - for (PartitionDiskBackedMessageStore<I, M> messageStore : - partitionMessageStores.values()) { - messageStore.clearAll(); - } - partitionMessageStores.clear(); - } - - /** - * Checks the memory status, flushes if necessary - * - * @throws IOException - */ - private void checkMemory() throws IOException { - while (memoryFull()) { - flushOnePartition(); - } - } - - /** - * Check if memory is full - * - * @return True iff memory is full - */ - private boolean memoryFull() { - int totalMessages = 0; - for (PartitionDiskBackedMessageStore<I, M> messageStore : - partitionMessageStores.values()) { - totalMessages += messageStore.getNumberOfMessages(); - } - return totalMessages > maxNumberOfMessagesInMemory; - } - - /** - * Finds biggest partition and flushes it to the disk - * - * @throws IOException - */ - private void flushOnePartition() throws IOException { - int maxMessages = 0; - PartitionDiskBackedMessageStore<I, M> biggestStore = null; - for (PartitionDiskBackedMessageStore<I, M> messageStore : - partitionMessageStores.values()) { - int numMessages = messageStore.getNumberOfMessages(); - if (numMessages > maxMessages) { - maxMessages = numMessages; - biggestStore = messageStore; - } - } - if (biggestStore != null) { - biggestStore.flush(); - } - } - - /** - * Get message store for partition which holds vertex with required vertex - * id - * - * @param vertexId Id of vertex for which we are asking for message store - * @return Requested message store - */ - private PartitionDiskBackedMessageStore<I, M> getMessageStore(I vertexId) { - int partitionId = - service.getVertexPartitionOwner(vertexId).getPartitionId(); - return getMessageStore(partitionId); - } - - /** - * Get message store for partition id. It it doesn't exist yet, - * creates a new one. - * - * @param partitionId Id of partition for which we are asking for message - * store - * @return Requested message store - */ - private PartitionDiskBackedMessageStore<I, M> getMessageStore( - int partitionId) { - PartitionDiskBackedMessageStore<I, M> messageStore = - partitionMessageStores.get(partitionId); - if (messageStore != null) { - return messageStore; - } - messageStore = partitionStoreFactory.newStore(messageClasses); - PartitionDiskBackedMessageStore<I, M> store = - partitionMessageStores.putIfAbsent(partitionId, messageStore); - return (store == null) ? messageStore : store; - } - - @Override - public void writePartition(DataOutput out, - int partitionId) throws IOException { - PartitionDiskBackedMessageStore<I, M> partitionStore = - partitionMessageStores.get(partitionId); - out.writeBoolean(partitionStore != null); - if (partitionStore != null) { - partitionStore.write(out); - } - } - - @Override - public void readFieldsForPartition(DataInput in, - int partitionId) throws IOException { - if (in.readBoolean()) { - PartitionDiskBackedMessageStore<I, M> messageStore = - partitionStoreFactory.newStore(messageClasses); - messageStore.readFields(in); - partitionMessageStores.put(partitionId, messageStore); - } - } - - - /** - * Create new factory for this message store - * - * @param service Service worker - * @param maxMessagesInMemory Number of messages to keep in memory - * @param fileStoreFactory Factory for creating file stores when - * flushing - * @param <I> Vertex id - * @param <V> Vertex data - * @param <E> Edge data - * @param <M> Message data - * @return Factory - */ - public static <I extends WritableComparable, V extends Writable, - E extends Writable, M extends Writable> - MessageStoreFactory<I, M, MessageStore<I, M>> newFactory( - CentralizedServiceWorker<I, V, E> service, - int maxMessagesInMemory, - MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I, M>> - fileStoreFactory) { - return new DiskBackedMessageStoreFactory<I, V, E, M>(service, - maxMessagesInMemory, - fileStoreFactory); - } -} - http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStoreFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStoreFactory.java deleted file mode 100644 index 728a2ed..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStoreFactory.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.comm.messages.out_of_core; - -import static org.apache.giraph.conf.GiraphConstants.MAX_MESSAGES_IN_MEMORY; - -import org.apache.giraph.bsp.CentralizedServiceWorker; -import org.apache.giraph.comm.messages.MessageStore; -import org.apache.giraph.comm.messages.MessageStoreFactory; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.conf.MessageClasses; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -/** - * Message store factory which persist the messages on the disk. - * - * @param <I> vertex id - * @param <V> vertex data - * @param <E> edge data - * @param <M> message data - */ -public class DiskBackedMessageStoreFactory<I extends WritableComparable, - V extends Writable, E extends Writable, M extends Writable> - implements MessageStoreFactory<I, M, MessageStore<I, M>> { - /** Service worker */ - private CentralizedServiceWorker<I, V, E> service; - /** Number of messages to keep in memory */ - private int maxMessagesInMemory; - /** Factory for creating file stores when flushing */ - private MessageStoreFactory<I, M, - PartitionDiskBackedMessageStore<I, M>> fileStoreFactory; - - /** - * Default constructor class helps in class invocation via Reflection - */ - public DiskBackedMessageStoreFactory() { - } - - /** - * @param service Service worker - * @param maxMessagesInMemory Number of messages to keep in memory - * @param fileStoreFactory Factory for creating file stores when flushing - */ - public DiskBackedMessageStoreFactory( - CentralizedServiceWorker<I, V, E> service, - int maxMessagesInMemory, - MessageStoreFactory<I, M, - PartitionDiskBackedMessageStore<I, M>> fileStoreFactory) { - this.service = service; - this.maxMessagesInMemory = maxMessagesInMemory; - this.fileStoreFactory = fileStoreFactory; - } - - @Override - public MessageStore<I, M> - newStore(MessageClasses<I, M> messageClasses) { - return new DiskBackedMessageStore<I, V, E, M>(messageClasses, - service, maxMessagesInMemory, fileStoreFactory); - } - - @Override - public void initialize(CentralizedServiceWorker service, - ImmutableClassesGiraphConfiguration conf) { - this.maxMessagesInMemory = MAX_MESSAGES_IN_MEMORY.get(conf); - - MessageStoreFactory<I, Writable, SequentialFileMessageStore<I, Writable>> - fileMessageStoreFactory = - SequentialFileMessageStore.newFactory(conf); - this.fileStoreFactory = - PartitionDiskBackedMessageStore.newFactory(conf, - fileMessageStoreFactory); - - this.service = service; - } - - @Override - public boolean shouldTraverseMessagesInOrder() { - return true; - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java deleted file mode 100644 index 698281f..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java +++ /dev/null @@ -1,370 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.comm.messages.out_of_core; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.Collection; -import java.util.Collections; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.ConcurrentNavigableMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import org.apache.giraph.bsp.CentralizedServiceWorker; -import org.apache.giraph.comm.messages.MessageStoreFactory; -import org.apache.giraph.comm.messages.MessagesIterable; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.conf.MessageClasses; -import org.apache.giraph.factories.MessageValueFactory; -import org.apache.giraph.utils.io.DataInputOutput; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -/** - * Message storage with in-memory map of messages and with support for - * flushing all the messages to the disk. Holds messages for a single partition. - * - * @param <I> Vertex id - * @param <M> Message data - */ -public class PartitionDiskBackedMessageStore<I extends WritableComparable, - M extends Writable> implements Writable { - /** Message classes */ - private final MessageClasses<I, M> messageClasses; - /** Message value factory */ - private final MessageValueFactory<M> messageValueFactory; - /** - * In-memory message map (must be sorted to insure that the ids are - * ordered) - */ - private volatile ConcurrentNavigableMap<I, DataInputOutput> - inMemoryMessages; - /** Hadoop configuration */ - private final ImmutableClassesGiraphConfiguration<I, ?, ?> config; - /** Counter for number of messages in-memory */ - private final AtomicInteger numberOfMessagesInMemory; - /** To keep vertex ids which we have messages for */ - private final Set<I> destinationVertices; - /** File stores in which we keep flushed messages */ - private final Collection<SequentialFileMessageStore<I, M>> fileStores; - /** Factory for creating file stores when flushing */ - private final - MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>> fileStoreFactory; - /** Lock for disk flushing */ - private final ReadWriteLock rwLock = new ReentrantReadWriteLock(true); - - /** - * Constructor. - * - * @param messageClasses Message classes information - * @param config Hadoop configuration - * @param fileStoreFactory Factory for creating file stores when flushing - */ - public PartitionDiskBackedMessageStore( - MessageClasses<I, M> messageClasses, - ImmutableClassesGiraphConfiguration<I, ?, ?> config, - MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>> - fileStoreFactory) { - inMemoryMessages = new ConcurrentSkipListMap<I, DataInputOutput>(); - this.messageClasses = messageClasses; - this.messageValueFactory = messageClasses.createMessageValueFactory(config); - this.config = config; - numberOfMessagesInMemory = new AtomicInteger(0); - destinationVertices = - Collections.newSetFromMap(Maps.<I, Boolean>newConcurrentMap()); - fileStores = Lists.newArrayList(); - this.fileStoreFactory = fileStoreFactory; - } - - /** - * Add vertex messages - * - * @param vertexId Vertex id to use - * @param messages Messages to add (note that the lifetime of the messages) - * is only until next() is called again) - * @return True if the vertex id ownership is taken by this method, - * false otherwise - * @throws IOException - */ - boolean addVertexMessages(I vertexId, - Iterable<M> messages) throws IOException { - boolean ownsVertexId = false; - destinationVertices.add(vertexId); - rwLock.readLock().lock(); - try { - DataInputOutput dataInputOutput = inMemoryMessages.get(vertexId); - if (dataInputOutput == null) { - DataInputOutput newDataInputOutput = - config.createMessagesInputOutput(); - dataInputOutput = - inMemoryMessages.putIfAbsent(vertexId, newDataInputOutput); - if (dataInputOutput == null) { - ownsVertexId = true; - dataInputOutput = newDataInputOutput; - } - } - - synchronized (dataInputOutput) { - for (M message : messages) { - message.write(dataInputOutput.getDataOutput()); - numberOfMessagesInMemory.getAndIncrement(); - } - } - } finally { - rwLock.readLock().unlock(); - } - - return ownsVertexId; - } - - /** - * Get the messages for a vertex. - * - * @param vertexId Vertex id for which we want to get messages - * @return Iterable of messages for a vertex id - */ - public Iterable<M> getVertexMessages(I vertexId) throws IOException { - DataInputOutput dataInputOutput = inMemoryMessages.get(vertexId); - if (dataInputOutput == null) { - dataInputOutput = config.createMessagesInputOutput(); - } - Iterable<M> combinedIterable = new MessagesIterable<M>( - dataInputOutput, messageValueFactory); - - for (SequentialFileMessageStore<I, M> fileStore : fileStores) { - combinedIterable = Iterables.concat(combinedIterable, - fileStore.getVertexMessages(vertexId)); - } - return combinedIterable; - } - - /** - * Get number of messages in memory - * - * @return Number of messages in memory - */ - public int getNumberOfMessages() { - return numberOfMessagesInMemory.get(); - } - - /** - * Check if we have messages for some vertex - * - * @param vertexId Id of vertex which we want to check - * @return True iff we have messages for vertex with required id - */ - public boolean hasMessagesForVertex(I vertexId) { - return destinationVertices.contains(vertexId); - } - - /** - * Gets vertex ids which we have messages for - * - * @return Iterable over vertex ids which we have messages for - */ - public Iterable<I> getDestinationVertices() { - return destinationVertices; - } - - /** - * Clears messages for a vertex. - * - * @param vertexId Vertex id for which we want to clear messages - * @throws IOException - */ - public void clearVertexMessages(I vertexId) throws IOException { - inMemoryMessages.remove(vertexId); - } - - /** - * Clears all resources used by this store. - * - * @throws IOException - */ - public void clearAll() throws IOException { - inMemoryMessages.clear(); - destinationVertices.clear(); - for (SequentialFileMessageStore<I, M> fileStore : fileStores) { - fileStore.clearAll(); - } - fileStores.clear(); - } - - /** - * Flushes messages to the disk. - * - * @throws IOException - */ - public void flush() throws IOException { - ConcurrentNavigableMap<I, DataInputOutput> messagesToFlush = null; - rwLock.writeLock().lock(); - try { - messagesToFlush = inMemoryMessages; - inMemoryMessages = new ConcurrentSkipListMap<I, DataInputOutput>(); - numberOfMessagesInMemory.set(0); - } finally { - rwLock.writeLock().unlock(); - } - SequentialFileMessageStore<I, M> fileStore = - fileStoreFactory.newStore(messageClasses); - fileStore.addMessages(messagesToFlush); - - synchronized (fileStores) { - fileStores.add(fileStore); - } - } - - @Override - public void write(DataOutput out) throws IOException { - // write destination vertices - out.writeInt(destinationVertices.size()); - for (I vertexId : destinationVertices) { - vertexId.write(out); - } - - // write of in-memory messages - out.writeInt(numberOfMessagesInMemory.get()); - - // write in-memory messages map - out.writeInt(inMemoryMessages.size()); - for (Entry<I, DataInputOutput> entry : inMemoryMessages.entrySet()) { - entry.getKey().write(out); - entry.getValue().write(out); - } - - // write file stores - out.writeInt(fileStores.size()); - for (SequentialFileMessageStore<I, M> fileStore : fileStores) { - fileStore.write(out); - } - } - - @Override - public void readFields(DataInput in) throws IOException { - // read destination vertices - int numVertices = in.readInt(); - for (int v = 0; v < numVertices; v++) { - I vertexId = config.createVertexId(); - vertexId.readFields(in); - destinationVertices.add(vertexId); - } - - // read in-memory messages - numberOfMessagesInMemory.set(in.readInt()); - - // read in-memory map - int mapSize = in.readInt(); - for (int m = 0; m < mapSize; m++) { - I vertexId = config.createVertexId(); - vertexId.readFields(in); - DataInputOutput dataInputOutput = config.createMessagesInputOutput(); - dataInputOutput.readFields(in); - inMemoryMessages.put(vertexId, dataInputOutput); - } - - // read file stores - int numFileStores = in.readInt(); - for (int s = 0; s < numFileStores; s++) { - SequentialFileMessageStore<I, M> fileStore = - fileStoreFactory.newStore(messageClasses); - fileStore.readFields(in); - fileStores.add(fileStore); - } - } - - - /** - * Create new factory for this message store - * - * @param config Hadoop configuration - * @param fileStoreFactory Factory for creating message stores for - * partitions - * @param <I> Vertex id - * @param <M> Message data - * @return Factory - */ - public static <I extends WritableComparable, M extends Writable> - MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I, M>> newFactory( - ImmutableClassesGiraphConfiguration<I, ?, ?> config, - MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>> - fileStoreFactory) { - return new Factory<I, M>(config, fileStoreFactory); - } - - /** - * Factory for {@link PartitionDiskBackedMessageStore} - * - * @param <I> Vertex id - * @param <M> Message data - */ - private static class Factory<I extends WritableComparable, - M extends Writable> implements MessageStoreFactory<I, M, - PartitionDiskBackedMessageStore<I, M>> { - /** Hadoop configuration */ - private final ImmutableClassesGiraphConfiguration<I, ?, ?> config; - /** Factory for creating message stores for partitions */ - private final MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>> - fileStoreFactory; - - /** - * @param config Hadoop configuration - * @param fileStoreFactory Factory for creating message stores for - * partitions - */ - public Factory(ImmutableClassesGiraphConfiguration<I, ?, ?> config, - MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>> - fileStoreFactory) { - this.config = config; - this.fileStoreFactory = fileStoreFactory; - } - - @Override - public PartitionDiskBackedMessageStore<I, M> newStore( - MessageClasses<I, M> messageClasses) { - return new PartitionDiskBackedMessageStore<I, M>(messageClasses, - config, fileStoreFactory); - } - - @Override - public void initialize(CentralizedServiceWorker<I, ?, ?> service, - ImmutableClassesGiraphConfiguration<I, ?, ?> conf) { - /* Implementation of this method is required if the class is to - * be exposed publicly and allow instantiating the class via the - * configuration parameter MESSAGE_STORE_FACTORY_CLASS. As this is - * a private class, hence the implementation of this method is skipped - * as the caller knows the specific required constructor parameters - * for instantiation. - */ - } - - @Override - public boolean shouldTraverseMessagesInOrder() { - return true; - } - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java deleted file mode 100644 index 8f589bc..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java +++ /dev/null @@ -1,437 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.comm.messages.out_of_core; - -import static org.apache.giraph.conf.GiraphConstants.MESSAGES_DIRECTORY; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.giraph.bsp.CentralizedServiceWorker; -import org.apache.giraph.comm.messages.MessageStoreFactory; -import org.apache.giraph.comm.messages.MessagesIterable; -import org.apache.giraph.conf.GiraphConstants; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.conf.MessageClasses; -import org.apache.giraph.factories.MessageValueFactory; -import org.apache.giraph.utils.EmptyIterable; -import org.apache.giraph.utils.io.DataInputOutput; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.log4j.Logger; - -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; - -/** - * Used for writing and reading collection of messages to the disk. - * {@link SequentialFileMessageStore#addMessages(NavigableMap)} - * should be called only once with the messages we want to store. - * <p/> - * It's optimized for retrieving messages in the natural order of vertex ids - * they are sent to. - * - * @param <I> Vertex id - * @param <M> Message data - */ -public class SequentialFileMessageStore<I extends WritableComparable, - M extends Writable> implements Writable { - /** Class logger */ - private static final Logger LOG = - Logger.getLogger(SequentialFileMessageStore.class); - /** Message class */ - private final MessageValueFactory<M> messageValueFactory; - /** File in which we store data */ - private final File file; - /** Configuration which we need for reading data */ - private final ImmutableClassesGiraphConfiguration<I, ?, ?> config; - /** Buffer size to use when reading and writing files */ - private final int bufferSize; - /** File input stream */ - private DataInputStream in; - /** How many vertices do we have left to read in the file */ - private int verticesLeft; - /** Id of currently read vertex */ - private I currentVertexId; - - /** - * Stores message on the disk. - * - * - * @param messageValueFactory Used to create message values - * @param config Configuration used later for reading - * @param bufferSize Buffer size to use when reading and writing - * @param fileName File in which we want to store messages - * @throws IOException - */ - public SequentialFileMessageStore( - MessageValueFactory<M> messageValueFactory, - ImmutableClassesGiraphConfiguration<I, ?, ?> config, - int bufferSize, - String fileName) { - this.messageValueFactory = messageValueFactory; - this.config = config; - this.bufferSize = bufferSize; - file = new File(fileName); - } - - /** - * Adds messages from one message store to another - * - * @param messageMap Add the messages from this map to this store - * @throws java.io.IOException - */ - public void addMessages(NavigableMap<I, DataInputOutput> messageMap) - throws IOException { - // Writes messages to its file - if (file.exists()) { - if (LOG.isDebugEnabled()) { - LOG.debug("addMessages: Deleting " + file); - } - if (!file.delete()) { - throw new IOException("Failed to delete existing file " + file); - } - } - if (!file.createNewFile()) { - throw new IOException("Failed to create file " + file); - } - if (LOG.isDebugEnabled()) { - LOG.debug("addMessages: Creating " + file); - } - - DataOutputStream out = null; - - try { - out = new DataOutputStream( - new BufferedOutputStream(new FileOutputStream(file), bufferSize)); - int destinationVertexIdCount = messageMap.size(); - out.writeInt(destinationVertexIdCount); - - // Dump the vertices and their messages in a sorted order - for (Map.Entry<I, DataInputOutput> entry : messageMap.entrySet()) { - I destinationVertexId = entry.getKey(); - destinationVertexId.write(out); - DataInputOutput dataInputOutput = entry.getValue(); - Iterable<M> messages = new MessagesIterable<M>( - dataInputOutput, messageValueFactory); - int messageCount = Iterables.size(messages); - out.writeInt(messageCount); - if (LOG.isDebugEnabled()) { - LOG.debug("addMessages: For vertex id " + destinationVertexId + - ", messages = " + messageCount + " to file " + file); - } - for (M message : messages) { - if (LOG.isDebugEnabled()) { - LOG.debug("addMessages: Wrote " + message + " to " + file); - } - message.write(out); - } - } - } finally { - if (out != null) { - out.close(); - } - } - } - - /** - * Reads messages for a vertex. It will find the messages only if all - * previous reads used smaller vertex ids than this one - messages should - * be retrieved in increasing order of vertex ids. - * - * @param vertexId Vertex id for which we want to get messages - * @return Messages for the selected vertex, or empty list if not used - * correctly - * @throws IOException - */ - public Iterable<M> getVertexMessages(I vertexId) throws - IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("getVertexMessages: Reading for vertex id " + vertexId + - " (currently " + currentVertexId + ") from " + file); - } - if (in == null) { - startReading(); - } - - I nextVertexId = getCurrentVertexId(); - while (nextVertexId != null && vertexId.compareTo(nextVertexId) > 0) { - nextVertexId = getNextVertexId(); - } - - if (nextVertexId == null || vertexId.compareTo(nextVertexId) < 0) { - return EmptyIterable.get(); - } - - return readMessagesForCurrentVertex(); - } - - /** - * Clears all resources used by this store. - */ - public void clearAll() throws IOException { - endReading(); - if (!file.delete()) { - LOG.error("clearAll: Failed to delete file " + file); - } - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeLong(file.length()); - FileInputStream input = new FileInputStream(file); - try { - byte[] buffer = new byte[bufferSize]; - while (true) { - int length = input.read(buffer); - if (length < 0) { - break; - } - out.write(buffer, 0, length); - } - } finally { - input.close(); - } - } - - @Override - public void readFields(DataInput in) throws IOException { - FileOutputStream output = new FileOutputStream(file); - try { - long fileLength = in.readLong(); - byte[] buffer = new byte[bufferSize]; - for (long position = 0; position < fileLength; position += bufferSize) { - int bytes = (int) Math.min(bufferSize, fileLength - position); - in.readFully(buffer, 0, bytes); - output.write(buffer); - } - } finally { - output.close(); - } - } - - /** - * Prepare for reading - * - * @throws IOException - */ - private void startReading() throws IOException { - currentVertexId = null; - in = new DataInputStream( - new BufferedInputStream(new FileInputStream(file), bufferSize)); - verticesLeft = in.readInt(); - if (LOG.isDebugEnabled()) { - LOG.debug("startReading: File " + file + " with " + - verticesLeft + " vertices left"); - } - } - - /** - * Gets current vertex id. - * <p/> - * If there is a vertex id whose messages haven't been read yet it - * will return that vertex id, otherwise it will read and return the next - * one. - * - * @return Current vertex id - * @throws IOException - */ - private I getCurrentVertexId() throws IOException { - if (currentVertexId != null) { - return currentVertexId; - } else { - return getNextVertexId(); - } - } - - /** - * Gets next vertex id. - * <p/> - * If there is a vertex whose messages haven't been read yet it - * will read and skip over its messages to get to the next vertex. - * - * @return Next vertex id - * @throws IOException - */ - private I getNextVertexId() throws IOException { - if (currentVertexId != null) { - readMessagesForCurrentVertex(); - } - if (verticesLeft == 0) { - return null; - } - currentVertexId = config.createVertexId(); - currentVertexId.readFields(in); - return currentVertexId; - } - - /** - * Reads messages for current vertex. - * - * @return Messages for current vertex - * @throws IOException - */ - private Collection<M> readMessagesForCurrentVertex() throws IOException { - int messagesSize = in.readInt(); - List<M> messages = Lists.newArrayListWithCapacity(messagesSize); - for (int i = 0; i < messagesSize; i++) { - M message = messageValueFactory.newInstance(); - try { - message.readFields(in); - } catch (IOException e) { - throw new IllegalStateException("readMessagesForCurrentVertex: " + - "Failed to read message from " + i + " of " + - messagesSize + " for vertex id " + currentVertexId + " from " + - file, e); - } - messages.add(message); - } - currentVertexDone(); - return messages; - } - - /** - * Release current vertex. - * - * @throws IOException - */ - private void currentVertexDone() throws IOException { - currentVertexId = null; - verticesLeft--; - if (verticesLeft == 0) { - endReading(); - } - } - - /** - * Call when we are done reading, for closing files. - * - * @throws IOException - */ - private void endReading() throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("endReading: Stopped reading " + file); - } - if (in != null) { - in.close(); - in = null; - } - } - - /** - * Create new factory for this message store - * - * @param config Hadoop configuration - * @param <I> Vertex id - * @param <M> Message data - * @return Factory - */ - public static <I extends WritableComparable, M extends Writable> - MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>> newFactory( - ImmutableClassesGiraphConfiguration<I, ?, ?> config) { - return new Factory<I, M>(config); - } - - /** - * Factory for {@link SequentialFileMessageStore} - * - * @param <I> Vertex id - * @param <M> Message data - */ - private static class Factory<I extends WritableComparable, - M extends Writable> - implements MessageStoreFactory<I, M, SequentialFileMessageStore<I, M>> { - /** Hadoop configuration */ - private final ImmutableClassesGiraphConfiguration<I, ?, ?> config; - /** Directories in which we'll keep necessary files */ - private final String[] directories; - /** Buffer size to use when reading and writing */ - private final int bufferSize; - /** Counter for created message stores */ - private final AtomicInteger storeCounter; - - /** - * Constructor. - * - * @param config Hadoop configuration - */ - public Factory(ImmutableClassesGiraphConfiguration<I, ?, ?> config) { - this.config = config; - String jobId = config.get("mapred.job.id", "Unknown Job"); - int taskId = config.getTaskPartition(); - List<String> userPaths = MESSAGES_DIRECTORY.getList(config); - Collections.shuffle(userPaths); - directories = new String[userPaths.size()]; - int i = 0; - for (String path : userPaths) { - String directory = path + File.separator + jobId + File.separator + - taskId + File.separator; - directories[i++] = directory; - if (!new File(directory).mkdirs()) { - LOG.error("SequentialFileMessageStore$Factory: Failed to create " + - directory); - } - } - this.bufferSize = GiraphConstants.MESSAGES_BUFFER_SIZE.get(config); - storeCounter = new AtomicInteger(); - } - - @Override - public SequentialFileMessageStore<I, M> newStore( - MessageClasses<I, M> messageClasses) { - int idx = Math.abs(storeCounter.getAndIncrement()); - String fileName = - directories[idx % directories.length] + "messages-" + idx; - return new SequentialFileMessageStore<I, M>( - messageClasses.createMessageValueFactory(config), config, - bufferSize, fileName); - } - - @Override - public void initialize(CentralizedServiceWorker<I, ?, ?> service, - ImmutableClassesGiraphConfiguration<I, ?, ?> conf) { - /* Implementation of this method is required if the class is to - * be exposed publicly and allow instantiating the class via the - * configuration parameter MESSAGE_STORE_FACTORY_CLASS. As this is - * a private class, hence the implementation of this method is skipped - * as the caller knows the specific required constructor parameters - * for instantiation. - */ - } - - @Override - public boolean shouldTraverseMessagesInOrder() { - return true; - } - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/package-info.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/package-info.java deleted file mode 100644 index 7039378..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Package of out-of-core messages related classes. - */ -package org.apache.giraph.comm.messages.out_of_core; http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java index 2e39857..57f3ff6 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java @@ -142,7 +142,7 @@ public class IdByteArrayMessageStore<I extends WritableComparable, @Override public void addPartitionMessages(int partitionId, - VertexIdMessages<I, M> messages) throws IOException { + VertexIdMessages<I, M> messages) { Basic2ObjectMap<I, DataInputOutput> partitionMap = map.get(partitionId); synchronized (partitionMap) { VertexIdMessageBytesIterator<I, M> vertexIdMessageBytesIterator = @@ -161,22 +161,27 @@ public class IdByteArrayMessageStore<I extends WritableComparable, dataInputOutput.getDataOutput()); } } else { - VertexIdMessageIterator<I, M> iterator = - messages.getVertexIdMessageIterator(); - while (iterator.hasNext()) { - iterator.next(); - DataInputOutput dataInputOutput = - getDataInputOutput(partitionMap, iterator.getCurrentVertexId()); - - VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator, - dataInputOutput.getDataOutput()); + try { + VertexIdMessageIterator<I, M> iterator = + messages.getVertexIdMessageIterator(); + while (iterator.hasNext()) { + iterator.next(); + DataInputOutput dataInputOutput = + getDataInputOutput(partitionMap, iterator.getCurrentVertexId()); + + VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator, + dataInputOutput.getDataOutput()); + } + } catch (IOException e) { + throw new RuntimeException("addPartitionMessages: IOException while" + + " adding message for a partition: " + e); } } } } @Override - public void clearPartition(int partitionId) throws IOException { + public void clearPartition(int partitionId) { map.get(partitionId).clear(); } @@ -193,7 +198,7 @@ public class IdByteArrayMessageStore<I extends WritableComparable, } @Override - public Iterable<M> getVertexMessages(I vertexId) throws IOException { + public Iterable<M> getVertexMessages(I vertexId) { DataInputOutput dataInputOutput = getPartitionMap(vertexId).get(vertexId); if (dataInputOutput == null) { return EmptyIterable.get(); @@ -203,12 +208,12 @@ public class IdByteArrayMessageStore<I extends WritableComparable, } @Override - public void clearVertexMessages(I vertexId) throws IOException { + public void clearVertexMessages(I vertexId) { getPartitionMap(vertexId).remove(vertexId); } @Override - public void clearAll() throws IOException { + public void clearAll() { map.clear(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java index 42fe992..4463ddb 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java @@ -125,7 +125,7 @@ public class IdOneMessagePerVertexStore<I extends WritableComparable, @Override public void addPartitionMessages( int partitionId, - VertexIdMessages<I, M> messages) throws IOException { + VertexIdMessages<I, M> messages) { Basic2ObjectMap<I, M> partitionMap = map.get(partitionId); synchronized (partitionMap) { VertexIdMessageIterator<I, M> @@ -152,7 +152,7 @@ public class IdOneMessagePerVertexStore<I extends WritableComparable, } @Override - public void clearPartition(int partitionId) throws IOException { + public void clearPartition(int partitionId) { map.get(partitionId).clear(); } @@ -168,8 +168,7 @@ public class IdOneMessagePerVertexStore<I extends WritableComparable, } @Override - public Iterable<M> getVertexMessages( - I vertexId) throws IOException { + public Iterable<M> getVertexMessages(I vertexId) { Basic2ObjectMap<I, M> partitionMap = getPartitionMap(vertexId); if (!partitionMap.containsKey(vertexId)) { return EmptyIterable.get(); @@ -179,12 +178,12 @@ public class IdOneMessagePerVertexStore<I extends WritableComparable, } @Override - public void clearVertexMessages(I vertexId) throws IOException { + public void clearVertexMessages(I vertexId) { getPartitionMap(vertexId).remove(vertexId); } @Override - public void clearAll() throws IOException { + public void clearAll() { map.clear(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java index 4c363f3..4ef9e76 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java @@ -127,8 +127,7 @@ public class IntByteArrayMessageStore<M extends Writable> @Override public void addPartitionMessages(int partitionId, - VertexIdMessages<IntWritable, M> messages) throws - IOException { + VertexIdMessages<IntWritable, M> messages) { Int2ObjectOpenHashMap<DataInputOutput> partitionMap = map.get(partitionId); synchronized (partitionMap) { @@ -149,14 +148,19 @@ public class IntByteArrayMessageStore<M extends Writable> dataInputOutput.getDataOutput()); } } else { - VertexIdMessageIterator<IntWritable, M> - iterator = messages.getVertexIdMessageIterator(); - while (iterator.hasNext()) { - iterator.next(); - DataInputOutput dataInputOutput = getDataInputOutput(partitionMap, - iterator.getCurrentVertexId().get()); - VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator, - dataInputOutput.getDataOutput()); + try { + VertexIdMessageIterator<IntWritable, M> + iterator = messages.getVertexIdMessageIterator(); + while (iterator.hasNext()) { + iterator.next(); + DataInputOutput dataInputOutput = getDataInputOutput(partitionMap, + iterator.getCurrentVertexId().get()); + VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator, + dataInputOutput.getDataOutput()); + } + } catch (IOException e) { + throw new RuntimeException("addPartitionMessages: IOException while" + + " adding messages for a partition: " + e); } } } @@ -167,7 +171,7 @@ public class IntByteArrayMessageStore<M extends Writable> } @Override - public void clearPartition(int partitionId) throws IOException { + public void clearPartition(int partitionId) { map.get(partitionId).clear(); } @@ -185,7 +189,7 @@ public class IntByteArrayMessageStore<M extends Writable> @Override public Iterable<M> getVertexMessages( - IntWritable vertexId) throws IOException { + IntWritable vertexId) { DataInputOutput dataInputOutput = getPartitionMap(vertexId).get(vertexId.get()); if (dataInputOutput == null) { @@ -196,12 +200,12 @@ public class IntByteArrayMessageStore<M extends Writable> } @Override - public void clearVertexMessages(IntWritable vertexId) throws IOException { + public void clearVertexMessages(IntWritable vertexId) { getPartitionMap(vertexId).remove(vertexId.get()); } @Override - public void clearAll() throws IOException { + public void clearAll() { map.clear(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java index 280f5b9..715bf45 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java @@ -96,8 +96,7 @@ public class IntFloatMessageStore @Override public void addPartitionMessages(int partitionId, - VertexIdMessages<IntWritable, FloatWritable> messages) throws - IOException { + VertexIdMessages<IntWritable, FloatWritable> messages) { IntWritable reusableVertexId = new IntWritable(); FloatWritable reusableMessage = new FloatWritable(); FloatWritable reusableCurrentMessage = new FloatWritable(); @@ -128,7 +127,7 @@ public class IntFloatMessageStore } @Override - public void clearPartition(int partitionId) throws IOException { + public void clearPartition(int partitionId) { map.get(partitionId).clear(); } @@ -145,7 +144,7 @@ public class IntFloatMessageStore @Override public Iterable<FloatWritable> getVertexMessages( - IntWritable vertexId) throws IOException { + IntWritable vertexId) { Int2FloatOpenHashMap partitionMap = getPartitionMap(vertexId); if (!partitionMap.containsKey(vertexId.get())) { return EmptyIterable.get(); @@ -156,12 +155,12 @@ public class IntFloatMessageStore } @Override - public void clearVertexMessages(IntWritable vertexId) throws IOException { + public void clearVertexMessages(IntWritable vertexId) { getPartitionMap(vertexId).remove(vertexId.get()); } @Override - public void clearAll() throws IOException { + public void clearAll() { map.clear(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java index d8a3fde..4fc4843 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java @@ -97,8 +97,7 @@ public class LongDoubleMessageStore @Override public void addPartitionMessages(int partitionId, - VertexIdMessages<LongWritable, DoubleWritable> messages) throws - IOException { + VertexIdMessages<LongWritable, DoubleWritable> messages) { LongWritable reusableVertexId = new LongWritable(); DoubleWritable reusableMessage = new DoubleWritable(); DoubleWritable reusableCurrentMessage = new DoubleWritable(); @@ -129,7 +128,7 @@ public class LongDoubleMessageStore } @Override - public void clearPartition(int partitionId) throws IOException { + public void clearPartition(int partitionId) { map.get(partitionId).clear(); } @@ -146,7 +145,7 @@ public class LongDoubleMessageStore @Override public Iterable<DoubleWritable> getVertexMessages( - LongWritable vertexId) throws IOException { + LongWritable vertexId) { Long2DoubleOpenHashMap partitionMap = getPartitionMap(vertexId); if (!partitionMap.containsKey(vertexId.get())) { return EmptyIterable.get(); @@ -157,12 +156,12 @@ public class LongDoubleMessageStore } @Override - public void clearVertexMessages(LongWritable vertexId) throws IOException { + public void clearVertexMessages(LongWritable vertexId) { getPartitionMap(vertexId).remove(vertexId.get()); } @Override - public void clearAll() throws IOException { + public void clearAll() { map.clear(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java index a0c977e..b3ed4b2 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java @@ -29,7 +29,6 @@ import org.apache.giraph.factories.MessageValueFactory; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; -import java.io.IOException; import java.util.List; /** @@ -91,7 +90,7 @@ public abstract class LongAbstractMessageStore<M extends Writable, T> } @Override - public void clearPartition(int partitionId) throws IOException { + public void clearPartition(int partitionId) { map.get(partitionId).clear(); } @@ -107,13 +106,13 @@ public abstract class LongAbstractMessageStore<M extends Writable, T> } @Override - public void clearVertexMessages(LongWritable vertexId) throws IOException { + public void clearVertexMessages(LongWritable vertexId) { getPartitionMap(vertexId).remove(vertexId.get()); } @Override - public void clearAll() throws IOException { + public void clearAll() { map.clear(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java index 092d963..bcdab98 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java @@ -89,7 +89,7 @@ public class LongByteArrayMessageStore<M extends Writable> @Override public void addPartitionMessages(int partitionId, - VertexIdMessages<LongWritable, M> messages) throws IOException { + VertexIdMessages<LongWritable, M> messages) { Long2ObjectOpenHashMap<DataInputOutput> partitionMap = map.get(partitionId); synchronized (partitionMap) { VertexIdMessageBytesIterator<LongWritable, M> @@ -109,14 +109,19 @@ public class LongByteArrayMessageStore<M extends Writable> dataInputOutput.getDataOutput()); } } else { - VertexIdMessageIterator<LongWritable, M> - iterator = messages.getVertexIdMessageIterator(); - while (iterator.hasNext()) { - iterator.next(); - DataInputOutput dataInputOutput = getDataInputOutput(partitionMap, - iterator.getCurrentVertexId().get()); - VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator, - dataInputOutput.getDataOutput()); + try { + VertexIdMessageIterator<LongWritable, M> + iterator = messages.getVertexIdMessageIterator(); + while (iterator.hasNext()) { + iterator.next(); + DataInputOutput dataInputOutput = getDataInputOutput(partitionMap, + iterator.getCurrentVertexId().get()); + VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator, + dataInputOutput.getDataOutput()); + } + } catch (IOException e) { + throw new RuntimeException("addPartitionMessages: IOException while" + + " adding messages for a partition: " + e); } } } @@ -128,7 +133,7 @@ public class LongByteArrayMessageStore<M extends Writable> @Override public Iterable<M> getVertexMessages( - LongWritable vertexId) throws IOException { + LongWritable vertexId) { DataInputOutput dataInputOutput = getPartitionMap(vertexId).get(vertexId.get()); if (dataInputOutput == null) { http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java index 32296ad..eef75ba 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java @@ -80,32 +80,37 @@ public class LongPointerListMessageStore<M extends Writable> @Override public void addPartitionMessages(int partitionId, - VertexIdMessages<LongWritable, M> messages) throws IOException { - VertexIdMessageIterator<LongWritable, M> iterator = - messages.getVertexIdMessageIterator(); - long pointer = 0; - LongArrayList list; - while (iterator.hasNext()) { - iterator.next(); - M msg = iterator.getCurrentMessage(); - list = getList(iterator); - if (iterator.isNewMessage()) { - IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut(); - pointer = indexAndDataOut.getIndex(); - pointer <<= 32; - ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput(); - pointer += dataOutput.getPos(); - msg.write(dataOutput); - } - synchronized (list) { // TODO - any better way? - list.add(pointer); + VertexIdMessages<LongWritable, M> messages) { + try { + VertexIdMessageIterator<LongWritable, M> iterator = + messages.getVertexIdMessageIterator(); + long pointer = 0; + LongArrayList list; + while (iterator.hasNext()) { + iterator.next(); + M msg = iterator.getCurrentMessage(); + list = getList(iterator); + if (iterator.isNewMessage()) { + IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut(); + pointer = indexAndDataOut.getIndex(); + pointer <<= 32; + ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput(); + pointer += dataOutput.getPos(); + msg.write(dataOutput); + } + synchronized (list) { // TODO - any better way? + list.add(pointer); + } } + } catch (IOException e) { + throw new RuntimeException("addPartitionMessages: IOException while" + + " adding messages for a partition: " + e); } } @Override public Iterable<M> getVertexMessages( - LongWritable vertexId) throws IOException { + LongWritable vertexId) { LongArrayList list = getPartitionMap(vertexId).get( vertexId.get()); if (list == null) { http://git-wip-us.apache.org/repos/asf/giraph/blob/c94dd9c7/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java index 04afba5..6273694 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java @@ -113,17 +113,17 @@ public final class AsyncMessageStoreWrapper<I extends WritableComparable, } @Override - public Iterable<M> getVertexMessages(I vertexId) throws IOException { + public Iterable<M> getVertexMessages(I vertexId) { return store.getVertexMessages(vertexId); } @Override - public void clearVertexMessages(I vertexId) throws IOException { + public void clearVertexMessages(I vertexId) { store.clearVertexMessages(vertexId); } @Override - public void clearAll() throws IOException { + public void clearAll() { try { for (BlockingQueue<PartitionMessage<I, M>> queue : queues) { queue.put(SHUTDOWN_QUEUE_MESSAGE); @@ -147,7 +147,7 @@ public final class AsyncMessageStoreWrapper<I extends WritableComparable, @Override public void addPartitionMessages( - int partitionId, VertexIdMessages<I, M> messages) throws IOException { + int partitionId, VertexIdMessages<I, M> messages) { int hash = partition2Queue.get(partitionId); try { queues[hash].put(new PartitionMessage<>(partitionId, messages)); @@ -167,7 +167,7 @@ public final class AsyncMessageStoreWrapper<I extends WritableComparable, } @Override - public void clearPartition(int partitionId) throws IOException { + public void clearPartition(int partitionId) { store.clearPartition(partitionId); } @@ -232,7 +232,7 @@ public final class AsyncMessageStoreWrapper<I extends WritableComparable, return; } } - } catch (IOException | InterruptedException e) { + } catch (InterruptedException e) { LOG.error("MessageStoreQueueWorker.run: " + message, e); return; }
