GIRAPH-927: Decouple netty server threads from message processing (edunov via pavanka)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/969a4881 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/969a4881 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/969a4881 Branch: refs/heads/release-1.1 Commit: 969a488183a42dedbde8dcec7ac00595a836974c Parents: 02d9e6c Author: Pavan Kumar <[email protected]> Authored: Fri Jul 18 16:20:17 2014 -0700 Committer: Pavan Kumar <[email protected]> Committed: Fri Jul 18 16:20:17 2014 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 + .../java/org/apache/giraph/comm/ServerData.java | 11 + .../messages/InMemoryMessageStoreFactory.java | 13 +- .../queue/AsyncMessageStoreWrapper.java | 238 +++++++++++++++++++ .../comm/messages/queue/PartitionMessage.java | 71 ++++++ .../comm/messages/queue/package-info.java | 22 ++ .../org/apache/giraph/conf/GiraphConstants.java | 8 + .../apache/giraph/graph/GraphTaskManager.java | 5 +- .../apache/giraph/worker/BspServiceWorker.java | 8 + .../queue/AsyncMessageStoreWrapperTest.java | 123 ++++++++++ .../org/apache/giraph/TestCheckpointing.java | 13 +- 11 files changed, 508 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/969a4881/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 4207339..d6d998c 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-927: Decouple netty server threads from message processing (edunov via pavanka) + GIRAPH-924: Fix checkpointing (edunov via majakabiljo) GIRAPH-921: Create ByteValueVertex to store vertex values as bytes without object instance (akyrola via majakabiljo) http://git-wip-us.apache.org/repos/asf/giraph/blob/969a4881/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java index 036510e..29488fc 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java @@ -23,6 +23,7 @@ import org.apache.giraph.comm.aggregators.AllAggregatorServerData; import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData; import org.apache.giraph.comm.messages.MessageStore; import org.apache.giraph.comm.messages.MessageStoreFactory; +import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.edge.EdgeStore; @@ -202,6 +203,16 @@ public class ServerData<I extends WritableComparable, } /** + * In case of async message store we have to wait for all messages + * to be processed before going into next superstep. + */ + public void waitForComplete() { + if (incomingMessageStore instanceof AsyncMessageStoreWrapper) { + ((AsyncMessageStoreWrapper) incomingMessageStore).waitToComplete(); + } + } + + /** * Get the vertex mutations (synchronize on the values) * * @return Vertex mutations http://git-wip-us.apache.org/repos/asf/giraph/blob/969a4881/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java index db22503..02ea7b2 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java @@ -24,8 +24,9 @@ import org.apache.giraph.comm.messages.primitives.IntByteArrayMessageStore; import org.apache.giraph.comm.messages.primitives.IntFloatMessageStore; import org.apache.giraph.comm.messages.primitives.long_id.LongByteArrayMessageStore; import org.apache.giraph.comm.messages.primitives.LongDoubleMessageStore; -import org.apache.giraph.comm.messages.primitives.long_id.LongPointerListMessageStore; +import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper; import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.comm.messages.primitives.long_id.LongPointerListMessageStore; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.factories.MessageValueFactory; import org.apache.hadoop.io.DoubleWritable; @@ -155,6 +156,16 @@ public class InMemoryMessageStoreFactory<I extends WritableComparable, (conf.useMessageCombiner() ? " message combiner " + conf.getMessageCombinerClass() : " no combiner")); } + + int asyncMessageStoreThreads = + GiraphConstants.ASYNC_MESSAGE_STORE_THREADS_COUNT.get(conf); + if (asyncMessageStoreThreads > 0) { + messageStore = new AsyncMessageStoreWrapper( + messageStore, + service.getPartitionStore().getPartitionIds(), + asyncMessageStoreThreads); + } + return messageStore; } http://git-wip-us.apache.org/repos/asf/giraph/blob/969a4881/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java new file mode 100644 index 0000000..a62834f --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.comm.messages.queue; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import it.unimi.dsi.fastutil.ints.Int2IntArrayMap; +import it.unimi.dsi.fastutil.ints.Int2IntMap; +import org.apache.giraph.comm.messages.MessageStore; +import org.apache.giraph.utils.VertexIdMessages; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.log4j.Logger; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; + +/** + * This class decouples message receiving and processing + * into separate threads thus reducing contention. + * It does not provide message store functionality itself, rather + * providing a wrapper around existing message stores that + * can now be used in async mode with only slight modifications. + * @param <I> Vertex id + * @param <M> Message data + */ +public final class AsyncMessageStoreWrapper<I extends WritableComparable, + M extends Writable> implements MessageStore<I, M> { + + /** Logger */ + private static final Logger LOG = + Logger.getLogger(AsyncMessageStoreWrapper.class); + /** Pass this id to clear the queues and shutdown all threads + * started by this processor */ + private static final PartitionMessage SHUTDOWN_QUEUE_MESSAGE = + new PartitionMessage(-1, null); + /** Pass this message to clear the queues but keep threads alive */ + private static final PartitionMessage CLEAR_QUEUE_MESSAGE = + new PartitionMessage(-1, null); + /** Executor that processes messages in background */ + private static final ExecutorService EXECUTOR_SERVICE = + Executors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setNameFormat("AsyncMessageStoreWrapper-%d").build()); + + /** Number of threads that will process messages in background */ + private final int threadsCount; + /** Queue that temporary stores messages */ + private final BlockingQueue<PartitionMessage<I, M>>[] queues; + /** Map from partition id to thread that process this partition */ + private final Int2IntMap partition2Queue; + /** Signals that all procesing is done */ + private Semaphore completionSemaphore; + /** Underlying message store */ + private final MessageStore<I, M> store; + + /** + * Constructs async wrapper around existing message store + * object. Requires partition list and number of threads + * to properly initialize background threads and assign partitions. + * Partitions are assigned to threads in round-robin fashion. + * It guarantees that all threads have almost the same number of + * partitions (+-1) no matter how partitions are assigned to this worker. + * @param store underlying message store to be used in computation + * @param partitions partitions assigned to this worker + * @param threadCount number of threads that will be used to process + * messages. + */ + public AsyncMessageStoreWrapper(MessageStore<I, M> store, + Iterable<Integer> partitions, + int threadCount) { + this.store = store; + this.threadsCount = threadCount; + completionSemaphore = new Semaphore(1 - threadsCount); + queues = new BlockingQueue[threadsCount]; + partition2Queue = new Int2IntArrayMap(); + LOG.info("AsyncMessageStoreWrapper enabled. Threads= " + threadsCount); + + for (int i = 0; i < threadsCount; i++) { + queues[i] = new LinkedBlockingQueue<>(); + EXECUTOR_SERVICE.submit(new MessageStoreQueueWorker(queues[i])); + } + + int cnt = 0; + for (int partitionId : partitions) { + partition2Queue.put(partitionId, cnt++ % threadsCount); + } + + } + + @Override + public boolean isPointerListEncoding() { + return store.isPointerListEncoding(); + } + + @Override + public Iterable<M> getVertexMessages(I vertexId) throws IOException { + return store.getVertexMessages(vertexId); + } + + @Override + public void clearVertexMessages(I vertexId) throws IOException { + store.clearVertexMessages(vertexId); + } + + @Override + public void clearAll() throws IOException { + try { + for (BlockingQueue<PartitionMessage<I, M>> queue : queues) { + queue.put(SHUTDOWN_QUEUE_MESSAGE); + } + completionSemaphore.acquire(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + store.clearAll(); + } + + @Override + public boolean hasMessagesForVertex(I vertexId) { + return store.hasMessagesForVertex(vertexId); + } + + @Override + public void addPartitionMessages( + int partitionId, VertexIdMessages<I, M> messages) throws IOException { + int hash = partition2Queue.get(partitionId); + try { + queues[hash].put(new PartitionMessage<>(partitionId, messages)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public void finalizeStore() { + store.finalizeStore(); + } + + @Override + public Iterable<I> getPartitionDestinationVertices(int partitionId) { + return store.getPartitionDestinationVertices(partitionId); + } + + @Override + public void clearPartition(int partitionId) throws IOException { + store.clearPartition(partitionId); + } + + @Override + public void writePartition(DataOutput out, int partitionId) + throws IOException { + store.writePartition(out, partitionId); + } + + @Override + public void readFieldsForPartition(DataInput in, int partitionId) + throws IOException { + store.readFieldsForPartition(in, partitionId); + } + + /** + * Wait till all messages are processed and all queues are empty. + */ + public void waitToComplete() { + try { + for (BlockingQueue<PartitionMessage<I, M>> queue : queues) { + queue.put(CLEAR_QUEUE_MESSAGE); + } + completionSemaphore.acquire(); + completionSemaphore = new Semaphore(1 - threadsCount); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + /** + * This runnable has logic for background thread + * that actually does message processing. + */ + private class MessageStoreQueueWorker implements Runnable { + /** + * Queue assigned to this background thread. + */ + private final BlockingQueue<PartitionMessage<I, M>> queue; + + /** + * Constructs runnable. + * @param queue where messages are put by client + */ + private MessageStoreQueueWorker( + BlockingQueue<PartitionMessage<I, M>> queue) { + this.queue = queue; + } + + @Override + public void run() { + PartitionMessage<I, M> message = null; + while (true) { + try { + message = queue.take(); + if (message.getMessage() != null) { + int partitionId = message.getPartitionId(); + store.addPartitionMessages(partitionId, message.getMessage()); + } else { + completionSemaphore.release(); + if (message == SHUTDOWN_QUEUE_MESSAGE) { + return; + } + } + } catch (IOException | InterruptedException e) { + LOG.error("MessageStoreQueueWorker.run: " + message, e); + return; + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/969a4881/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/PartitionMessage.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/PartitionMessage.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/PartitionMessage.java new file mode 100644 index 0000000..8c884ce --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/PartitionMessage.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.comm.messages.queue; + +import org.apache.giraph.utils.VertexIdMessages; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Small wrapper that holds a reference to vertex message + * and knows partition id. + * @param <I> vertexId type parameter + * @param <M> message type parameter + */ +public class PartitionMessage<I extends WritableComparable, + M extends Writable> { + /** partition id */ + private int partitionId; + /** vertext message */ + private VertexIdMessages<I, M> message; + + /** + * Constructs wrapper from partitino id and vertext message + * object. + * @param partitionId destination partition id + * @param message message object + */ + public PartitionMessage(int partitionId, VertexIdMessages<I, M> message) { + this.partitionId = partitionId; + this.message = message; + } + + /** + * Partition id + * @return destination partition id. + */ + public int getPartitionId() { + return partitionId; + } + + /** + * Message + * @return vertex message + */ + public VertexIdMessages<I, M> getMessage() { + return message; + } + + @Override + public String toString() { + return "PartitionMessage{" + + "partitionId=" + partitionId + + ", message=" + message + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/969a4881/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/package-info.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/package-info.java new file mode 100644 index 0000000..e54f9f2 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Package for message store queue, that decouples netty threads from + * threads processing messages. + */ +package org.apache.giraph.comm.messages.queue; http://git-wip-us.apache.org/repos/asf/giraph/blob/969a4881/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index 3d16e9c..0424a47 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -1143,5 +1143,13 @@ public interface GiraphConstants { "org.apache.hadoop.io.compress.DefaultCodec", "Defines compression algorithm we will be using for " + "storing checkpoint"); + + /** Number of threads to use in async message store, 0 means + * we should not use async message processing */ + IntConfOption ASYNC_MESSAGE_STORE_THREADS_COUNT = + new IntConfOption("giraph.async.message.store.threads", 0, + "Number of threads to be used in async message store."); + + } // CHECKSTYLE: resume InterfaceIsTypeCheck http://git-wip-us.apache.org/repos/asf/giraph/blob/969a4881/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java index b2a5c84..684f4eb 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java @@ -38,8 +38,6 @@ import org.apache.giraph.partition.Partition; import org.apache.giraph.partition.PartitionOwner; import org.apache.giraph.partition.PartitionStats; import org.apache.giraph.partition.PartitionStore; -import org.apache.giraph.time.SystemTime; -import org.apache.giraph.time.Time; import org.apache.giraph.utils.CallableFactory; import org.apache.giraph.utils.MemoryUtils; import org.apache.giraph.utils.ProgressableUtils; @@ -102,8 +100,6 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, /** Name of metric for time from first message till last message flushed */ public static final String TIMER_COMMUNICATION_TIME = "communication-time-ms"; - /** Time instance used for timing in this class */ - private static final Time TIME = SystemTime.get(); /** Class logger */ private static final Logger LOG = Logger.getLogger(GraphTaskManager.class); /** Coordination service worker */ @@ -304,6 +300,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, } finishedSuperstepStats = completeSuperstepAndCollectStats( partitionStatsList, superstepTimerContext); + // END of superstep compute loop } http://git-wip-us.apache.org/repos/asf/giraph/blob/969a4881/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index 0d90a59..d2d24ee 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -26,6 +26,8 @@ import org.apache.giraph.comm.WorkerClient; import org.apache.giraph.comm.WorkerClientRequestProcessor; import org.apache.giraph.comm.WorkerServer; import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor; +import org.apache.giraph.comm.messages.MessageStore; +import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper; import org.apache.giraph.comm.netty.NettyWorkerAggregatorRequestProcessor; import org.apache.giraph.comm.netty.NettyWorkerClient; import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor; @@ -891,6 +893,12 @@ public class BspServiceWorker<I extends WritableComparable, aggregatorHandler.finishSuperstep(workerAggregatorRequestProcessor); + MessageStore<I, Writable> incomingMessageStore = + getServerData().getIncomingMessageStore(); + if (incomingMessageStore instanceof AsyncMessageStoreWrapper) { + ((AsyncMessageStoreWrapper) incomingMessageStore).waitToComplete(); + } + if (LOG.isInfoEnabled()) { LOG.info("finishSuperstep: Superstep " + getSuperstep() + ", messages = " + workerSentMessages + " " + http://git-wip-us.apache.org/repos/asf/giraph/blob/969a4881/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java new file mode 100644 index 0000000..ca1031a --- /dev/null +++ b/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.comm.messages.queue; + +import org.apache.giraph.comm.messages.MessageStore; +import org.apache.giraph.factories.TestMessageValueFactory; +import org.apache.giraph.utils.ByteArrayVertexIdMessages; +import org.apache.giraph.utils.VertexIdMessages; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.junit.Test; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Arrays; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertNotNull; + +/** + * Test case for AsyncMessageStoreWrapper + */ +public class AsyncMessageStoreWrapperTest { + + + @Test + public void testAsyncQueue() throws IOException { + TestMessageStore store = new TestMessageStore(); + + AsyncMessageStoreWrapper<LongWritable, IntWritable> queue = + new AsyncMessageStoreWrapper<>(store, + Arrays.asList(0, 1, 2, 3, 4), 2); + + for (int i = 0; i < 1000; i++) { + queue.addPartitionMessages(i % 5, new ByteArrayVertexIdMessages<LongWritable, IntWritable>(new TestMessageValueFactory<>(IntWritable.class))); + } + + queue.waitToComplete(); + + assertArrayEquals(new int[] {200, 200, 200, 200, 200}, store.counters); + + queue.clearAll(); + } + + + static class TestMessageStore implements MessageStore<LongWritable, IntWritable> { + + private int counters[] = new int[5]; + + @Override + public void addPartitionMessages(int partition, VertexIdMessages messages) throws IOException { + assertNotNull(messages); + counters[partition]++; + } + + @Override + public boolean isPointerListEncoding() { + return false; + } + + @Override + public Iterable<IntWritable> getVertexMessages(LongWritable vertexId) throws IOException { + return null; + } + + @Override + public void clearVertexMessages(LongWritable vertexId) throws IOException { + + } + + @Override + public void clearAll() throws IOException { + + } + + @Override + public boolean hasMessagesForVertex(LongWritable vertexId) { + return false; + } + + @Override + public void finalizeStore() { + + } + + @Override + public Iterable<LongWritable> getPartitionDestinationVertices(int partitionId) { + return null; + } + + @Override + public void clearPartition(int partitionId) throws IOException { + + } + + @Override + public void writePartition(DataOutput out, int partitionId) throws IOException { + + } + + @Override + public void readFieldsForPartition(DataInput in, int partitionId) throws IOException { + + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/969a4881/giraph-examples/src/test/java/org/apache/giraph/TestCheckpointing.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestCheckpointing.java b/giraph-examples/src/test/java/org/apache/giraph/TestCheckpointing.java index 387b937..2939af7 100644 --- a/giraph-examples/src/test/java/org/apache/giraph/TestCheckpointing.java +++ b/giraph-examples/src/test/java/org/apache/giraph/TestCheckpointing.java @@ -71,9 +71,17 @@ public class TestCheckpointing extends BspCase { super(TestCheckpointing.class.getName()); } + @Test + public void testBspCheckpoint() throws InterruptedException, IOException, ClassNotFoundException { + testBspCheckpoint(false); + } @Test - public void testBspCheckpoint() + public void testAsyncMessageStoreCheckpoint() throws InterruptedException, IOException, ClassNotFoundException { + testBspCheckpoint(true); + } + + public void testBspCheckpoint(boolean useAsyncMessageStore) throws IOException, InterruptedException, ClassNotFoundException { Path checkpointsDir = getTempPath("checkpointing"); Path outputPath = getTempPath(getCallingMethodName()); @@ -88,6 +96,9 @@ public class TestCheckpointing extends BspCase { conf.setVertexOutputFormatClass(SimpleSuperstepComputation.SimpleSuperstepVertexOutputFormat.class); conf.set("mapred.job.id", TEST_JOB_ID); conf.set(KEY_MIN_SUPERSTEP, "0"); + if (useAsyncMessageStore) { + GiraphConstants.ASYNC_MESSAGE_STORE_THREADS_COUNT.set(conf, 2); + } GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath); GiraphConfiguration configuration = job.getConfiguration();
