New out-of-core infrastructure (first patch including fixed out-of-core mechanism)
Summary: This is a re-design of out-of-core mechanism. The new implementation allows for much more intelligent partition scheduling and IO. Test Plan: mvn clean verify Reviewers: maja.kabiljo, sergey.edunov, avery.ching, dionysis.logothetis Reviewed By: dionysis.logothetis Differential Revision: https://reviews.facebook.net/D54549 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/fafecee7 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/fafecee7 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/fafecee7 Branch: refs/heads/trunk Commit: fafecee712bc9b2ce8ef081d8170cdf99c48288b Parents: c6af3ed Author: Sergey Edunov <[email protected]> Authored: Tue Mar 15 10:40:20 2016 -0700 Committer: Sergey Edunov <[email protected]> Committed: Tue Mar 15 10:40:20 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/giraph/comm/ServerData.java | 213 +- .../giraph/comm/messages/MessageData.java | 82 - .../comm/messages/SimpleMessageStore.java | 3 +- .../out_of_core/DiskBackedMessageStore.java | 6 +- .../primitives/IdByteArrayMessageStore.java | 4 +- .../primitives/IdOneMessagePerVertexStore.java | 3 +- .../primitives/IntByteArrayMessageStore.java | 4 +- .../primitives/IntFloatMessageStore.java | 3 +- .../primitives/LongDoubleMessageStore.java | 3 +- .../long_id/LongAbstractMessageStore.java | 3 +- .../NettyWorkerClientRequestProcessor.java | 2 +- .../SendPartitionCurrentMessagesRequest.java | 4 +- .../comm/requests/SendWorkerEdgesRequest.java | 2 +- .../requests/SendWorkerMessagesRequest.java | 4 +- .../SendWorkerOneMessageToManyRequest.java | 13 +- .../org/apache/giraph/conf/GiraphConstants.java | 11 - .../apache/giraph/edge/AbstractEdgeStore.java | 22 +- .../java/org/apache/giraph/edge/EdgeStore.java | 21 +- .../apache/giraph/graph/GraphTaskManager.java | 3 +- .../giraph/ooc/AdaptiveOutOfCoreEngine.java | 284 --- .../apache/giraph/ooc/CheckMemoryCallable.java | 478 ---- .../giraph/ooc/DiskBackedPartitionStore.java | 2149 ------------------ .../apache/giraph/ooc/FixedOutOfCoreEngine.java | 147 ++ .../giraph/ooc/FixedOutOfCoreIOScheduler.java | 211 ++ .../apache/giraph/ooc/JVMMemoryEstimator.java | 45 - .../org/apache/giraph/ooc/MemoryEstimator.java | 44 - .../org/apache/giraph/ooc/OutOfCoreEngine.java | 187 +- .../apache/giraph/ooc/OutOfCoreIOCallable.java | 90 + .../giraph/ooc/OutOfCoreIOCallableFactory.java | 184 ++ .../apache/giraph/ooc/OutOfCoreIOScheduler.java | 105 + .../giraph/ooc/OutOfCoreProcessorCallable.java | 170 -- .../giraph/ooc/data/DiskBackedEdgeStore.java | 207 ++ .../giraph/ooc/data/DiskBackedMessageStore.java | 297 +++ .../ooc/data/DiskBackedPartitionStore.java | 469 ++++ .../giraph/ooc/data/MetaPartitionManager.java | 947 ++++++++ .../giraph/ooc/data/OutOfCoreDataManager.java | 385 ++++ .../apache/giraph/ooc/data/package-info.java | 22 + .../org/apache/giraph/ooc/io/IOCommand.java | 65 + .../giraph/ooc/io/LoadPartitionIOCommand.java | 92 + .../giraph/ooc/io/StoreDataBufferIOCommand.java | 89 + .../ooc/io/StoreIncomingMessageIOCommand.java | 60 + .../giraph/ooc/io/StorePartitionIOCommand.java | 76 + .../org/apache/giraph/ooc/io/WaitIOCommand.java | 58 + .../org/apache/giraph/ooc/io/package-info.java | 21 + .../apache/giraph/partition/PartitionData.java | 116 - .../apache/giraph/partition/PartitionStore.java | 206 +- .../giraph/partition/SimplePartitionStore.java | 46 +- .../apache/giraph/worker/BspServiceWorker.java | 16 +- .../apache/giraph/comm/RequestFailureTest.java | 7 +- .../org/apache/giraph/comm/RequestTest.java | 14 +- .../giraph/partition/TestPartitionStores.java | 81 +- .../java/org/apache/giraph/TestOutOfCore.java | 46 +- 52 files changed, 4103 insertions(+), 3717 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java index b177446..be34820 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java @@ -31,20 +31,32 @@ import com.google.common.collect.Maps; import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.comm.aggregators.AllAggregatorServerData; import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData; +import org.apache.giraph.comm.messages.MessageStore; +import org.apache.giraph.comm.messages.MessageStoreFactory; +import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.edge.EdgeStore; +import org.apache.giraph.edge.EdgeStoreFactory; import org.apache.giraph.graph.Vertex; import org.apache.giraph.graph.VertexMutations; import org.apache.giraph.graph.VertexResolver; -import org.apache.giraph.ooc.DiskBackedPartitionStore; +import org.apache.giraph.ooc.data.DiskBackedEdgeStore; +import org.apache.giraph.ooc.data.DiskBackedMessageStore; +import org.apache.giraph.ooc.data.DiskBackedPartitionStore; +import org.apache.giraph.ooc.FixedOutOfCoreEngine; +import org.apache.giraph.ooc.OutOfCoreEngine; import org.apache.giraph.partition.Partition; import org.apache.giraph.partition.PartitionStore; import org.apache.giraph.partition.SimplePartitionStore; +import org.apache.giraph.utils.ReflectionUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; import org.apache.log4j.Logger; +import static org.apache.giraph.conf.GiraphConstants.MESSAGE_STORE_FACTORY_CLASS; + /** * Anything that the server stores * @@ -56,12 +68,26 @@ import org.apache.log4j.Logger; public class ServerData<I extends WritableComparable, V extends Writable, E extends Writable> { /** Class logger */ - private static final Logger LOG = - Logger.getLogger(ServerData.class); + private static final Logger LOG = Logger.getLogger(ServerData.class); /** Configuration */ private final ImmutableClassesGiraphConfiguration<I, V, E> conf; /** Partition store for this worker. */ private volatile PartitionStore<I, V, E> partitionStore; + /** Edge store for this worker. */ + private final EdgeStore<I, V, E> edgeStore; + /** Message store factory */ + private final MessageStoreFactory<I, Writable, MessageStore<I, Writable>> + messageStoreFactory; + /** + * Message store for incoming messages (messages which will be consumed + * in the next super step) + */ + private volatile MessageStore<I, Writable> incomingMessageStore; + /** + * Message store for current messages (messages which we received in + * previous super step and which will be consumed in current super step) + */ + private volatile MessageStore<I, Writable> currentMessageStore; /** * Map of partition ids to vertex mutations from other workers. These are * mutations that should be applied before execution of *current* super step. @@ -81,7 +107,7 @@ public class ServerData<I extends WritableComparable, ConcurrentMap<I, VertexMutations<I, V, E>>> partitionMutations = Maps.newConcurrentMap(); /** - * Holds aggregtors which current worker owns from current superstep + * Holds aggregators which current worker owns from current superstep */ private final OwnerAggregatorServerData ownerAggregatorData; /** @@ -100,6 +126,8 @@ public class ServerData<I extends WritableComparable, /** Job context (for progress) */ private final Mapper<?, ?, ?, ?>.Context context; + /** Out-of-core engine */ + private final OutOfCoreEngine oocEngine; /** * Constructor. @@ -114,13 +142,32 @@ public class ServerData<I extends WritableComparable, Mapper<?, ?, ?, ?>.Context context) { this.serviceWorker = service; this.conf = conf; + this.messageStoreFactory = createMessageStoreFactory(); + EdgeStoreFactory<I, V, E> edgeStoreFactory = conf.createEdgeStoreFactory(); + edgeStoreFactory.initialize(service, conf, context); + EdgeStore<I, V, E> inMemoryEdgeStore = edgeStoreFactory.newStore(); + PartitionStore<I, V, E> inMemoryPartitionStore = + new SimplePartitionStore<I, V, E>(conf, context); if (GiraphConstants.USE_OUT_OF_CORE_GRAPH.get(conf)) { + int maxPartitionsInMemory = + GiraphConstants.MAX_PARTITIONS_IN_MEMORY.get(conf); + if (maxPartitionsInMemory == 0) { + throw new IllegalStateException("ServerData: Adaptive " + + "out-of-core engine is not supported yet! Number of partitions in" + + " memory should be greater than 0."); + } else { + oocEngine = new FixedOutOfCoreEngine(conf, service, + maxPartitionsInMemory); + } partitionStore = - new DiskBackedPartitionStore<I, V, E>(conf, context, - getServiceWorker()); + new DiskBackedPartitionStore<I, V, E>(inMemoryPartitionStore, + conf, context, service, oocEngine); + edgeStore = + new DiskBackedEdgeStore<I, V, E>(inMemoryEdgeStore, conf, oocEngine); } else { - partitionStore = - new SimplePartitionStore<I, V, E>(conf, context, getServiceWorker()); + partitionStore = inMemoryPartitionStore; + edgeStore = inMemoryEdgeStore; + oocEngine = null; } ownerAggregatorData = new OwnerAggregatorServerData(context); allAggregatorData = new AllAggregatorServerData(context, conf); @@ -128,6 +175,42 @@ public class ServerData<I extends WritableComparable, } /** + * Decide which message store should be used for current application, + * and create the factory for that store + * + * @return Message store factory + */ + private MessageStoreFactory<I, Writable, MessageStore<I, Writable>> + createMessageStoreFactory() { + Class<? extends MessageStoreFactory> messageStoreFactoryClass = + MESSAGE_STORE_FACTORY_CLASS.get(conf); + + MessageStoreFactory messageStoreFactoryInstance = + ReflectionUtils.newInstance(messageStoreFactoryClass); + messageStoreFactoryInstance.initialize(serviceWorker, conf); + + return messageStoreFactoryInstance; + } + + /** + * Return the out-of-core engine for this worker. + * + * @return The out-of-core engine + */ + public OutOfCoreEngine getOocEngine() { + return oocEngine; + } + + /** + * Return the edge store for this worker. + * + * @return The edge store + */ + public EdgeStore<I, V, E> getEdgeStore() { + return edgeStore; + } + + /** * Return the partition store for this worker. * * @return The partition store @@ -137,21 +220,103 @@ public class ServerData<I extends WritableComparable, } /** + * Get message store for incoming messages (messages which will be consumed + * in the next super step) + * + * @param <M> Message data + * @return Incoming message store + */ + public <M extends Writable> MessageStore<I, M> getIncomingMessageStore() { + return (MessageStore<I, M>) incomingMessageStore; + } + + /** + * Get message store for current messages (messages which we received in + * previous super step and which will be consumed in current super step) + * + * @param <M> Message data + * @return Current message store + */ + public <M extends Writable> MessageStore<I, M> getCurrentMessageStore() { + return (MessageStore<I, M>) currentMessageStore; + } + + /** * Re-initialize message stores. * Discards old values if any. + * * @throws IOException */ public void resetMessageStores() throws IOException { - getPartitionStore().resetMessageStores(); - currentWorkerToWorkerMessages = - Collections.synchronizedList(new ArrayList<Writable>()); - incomingWorkerToWorkerMessages = - Collections.synchronizedList(new ArrayList<Writable>()); + if (currentMessageStore != null) { + currentMessageStore.clearAll(); + currentMessageStore = null; + } + if (incomingMessageStore != null) { + incomingMessageStore.clearAll(); + incomingMessageStore = null; + } + prepareSuperstep(); } - /** Prepare for next super step */ + /** Prepare for next superstep */ public void prepareSuperstep() { - partitionStore.prepareSuperstep(); + if (currentMessageStore != null) { + try { + currentMessageStore.clearAll(); + } catch (IOException e) { + throw new IllegalStateException( + "Failed to clear previous message store"); + } + } + + MessageStore<I, Writable> nextCurrentMessageStore; + MessageStore<I, Writable> nextIncomingMessageStore; + MessageStore<I, Writable> messageStore; + + // First create the necessary in-memory message stores. If out-of-core + // mechanism is enabled, we wrap the in-memory message stores within + // disk-backed messages stores. + if (incomingMessageStore != null) { + nextCurrentMessageStore = incomingMessageStore; + } else { + messageStore = messageStoreFactory.newStore( + conf.getIncomingMessageClasses()); + if (oocEngine == null) { + nextCurrentMessageStore = messageStore; + } else { + nextCurrentMessageStore = new DiskBackedMessageStore<>( + conf, messageStore, + conf.getIncomingMessageClasses().useMessageCombiner(), + serviceWorker.getSuperstep()); + } + } + + messageStore = messageStoreFactory.newStore( + conf.getOutgoingMessageClasses()); + if (oocEngine == null) { + nextIncomingMessageStore = messageStore; + } else { + nextIncomingMessageStore = new DiskBackedMessageStore<>( + conf, messageStore, + conf.getOutgoingMessageClasses().useMessageCombiner(), + serviceWorker.getSuperstep() + 1); + } + + // If out-of-core engine is enabled, we avoid overlapping of out-of-core + // decisions with change of superstep. This avoidance is done to simplify + // the design and reduce excessive use of synchronization primitives. + if (oocEngine != null) { + oocEngine.getSuperstepLock().writeLock().lock(); + } + currentMessageStore = nextCurrentMessageStore; + incomingMessageStore = nextIncomingMessageStore; + if (oocEngine != null) { + oocEngine.getMetaPartitionManager().resetMessages(); + oocEngine.getSuperstepLock().writeLock().unlock(); + } + currentMessageStore.finalizeStore(); + currentWorkerToWorkerMessages = incomingWorkerToWorkerMessages; incomingWorkerToWorkerMessages = Collections.synchronizedList(new ArrayList<Writable>()); @@ -252,8 +417,7 @@ public class ServerData<I extends WritableComparable, VertexMutations<I, V, E> vertexMutations = entry.getValue(); Vertex<I, V, E> vertex = vertexResolver.resolve(vertexId, originalVertex, vertexMutations, - getPartitionStore().getCurrentMessageStore() - .hasMessagesForVertex(entry.getKey())); + getCurrentMessageStore().hasMessagesForVertex(entry.getKey())); if (LOG.isDebugEnabled()) { LOG.debug("resolvePartitionMutations: Resolved vertex index " + @@ -269,8 +433,7 @@ public class ServerData<I extends WritableComparable, } else if (originalVertex != null) { partition.removeVertex(vertexId); try { - getPartitionStore().getCurrentMessageStore() - .clearVertexMessages(vertexId); + getCurrentMessageStore().clearVertexMessages(vertexId); } catch (IOException e) { throw new IllegalStateException("resolvePartitionMutations: " + "Caught IOException while clearing messages for a deleted " + @@ -283,7 +446,7 @@ public class ServerData<I extends WritableComparable, // Keep track of vertices which are not here in the partition, but have // received messages - Iterable<I> destinations = getPartitionStore().getCurrentMessageStore(). + Iterable<I> destinations = getCurrentMessageStore(). getPartitionDestinationVertices(partitionId); if (!Iterables.isEmpty(destinations)) { for (I vertexId : destinations) { @@ -307,4 +470,14 @@ public class ServerData<I extends WritableComparable, } } } + + /** + * In case of async message store we have to wait for all messages + * to be processed before going into next superstep. + */ + public void waitForComplete() { + if (incomingMessageStore instanceof AsyncMessageStoreWrapper) { + ((AsyncMessageStoreWrapper) incomingMessageStore).waitToComplete(); + } + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageData.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageData.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageData.java deleted file mode 100644 index f974823..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageData.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.comm.messages; - -import org.apache.giraph.utils.VertexIdMessages; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -import java.io.IOException; - -/** - * Structure that keeps message information. - * - * @param <I> Vertex id - */ -public interface MessageData<I extends WritableComparable> { - /** - * Get message store for incoming messages (messages which will be consumed - * in the next super step) - * - * @param <M> Message data type - * @return Incoming message store - */ - <M extends Writable> MessageStore<I, M> getIncomingMessageStore(); - - /** - * Get message store for current messages (messages which we received in - * previous super step and which will be consumed in current super step) - * - * @param <M> Message data type - * @return Current message store - */ - <M extends Writable> MessageStore<I, M> getCurrentMessageStore(); - - /** - * Re-initialize message stores. - * Discards old values if any. - - * @throws IOException - */ - void resetMessageStores() throws IOException; - - /** - * Adds messages for partition to current message store - * - * @param <M> Message data type - * @param partitionId Id of partition - * @param messages Collection of vertex ids and messages we want to add - * @throws IOException - */ - <M extends Writable> void addPartitionCurrentMessages( - int partitionId, VertexIdMessages<I, M> messages) - throws IOException; - - /** - * Adds messages for partition to incoming message store - * - * @param <M> Message data type - * @param partitionId Id of partition - * @param messages Collection of vertex ids and messages we want to add - * @throws IOException - */ - <M extends Writable> void addPartitionIncomingMessages( - int partitionId, VertexIdMessages<I, M> messages) - throws IOException; -} http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java index a1d3625..054302d 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java @@ -212,7 +212,8 @@ public abstract class SimpleMessageStore<I extends WritableComparable, @Override public boolean hasMessagesForPartition(int partitionId) { - return !map.get(partitionId).isEmpty(); + ConcurrentMap<I, T> partitionMessages = map.get(partitionId); + return partitionMessages != null && !partitionMessages.isEmpty(); } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java index b28d15b..0d7009b 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java @@ -132,8 +132,10 @@ public class DiskBackedMessageStore<I extends WritableComparable, @Override public boolean hasMessagesForPartition(int partitionId) { - return !Iterables - .isEmpty(getMessageStore(partitionId).getDestinationVertices()); + PartitionDiskBackedMessageStore<I, M> partitionMessages = + getMessageStore(partitionId); + return partitionMessages != null && !Iterables + .isEmpty(partitionMessages.getDestinationVertices()); } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java index e1e7a3f..2e39857 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java @@ -187,7 +187,9 @@ public class IdByteArrayMessageStore<I extends WritableComparable, @Override public boolean hasMessagesForPartition(int partitionId) { - return map.get(partitionId).size() != 0; + Basic2ObjectMap<I, DataInputOutput> partitionMessages = + map.get(partitionId); + return partitionMessages != null && partitionMessages.size() != 0; } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java index b172d24..42fe992 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java @@ -163,7 +163,8 @@ public class IdOneMessagePerVertexStore<I extends WritableComparable, @Override public boolean hasMessagesForPartition(int partitionId) { - return map.get(partitionId).size() != 0; + Basic2ObjectMap<I, M> partitionMessages = map.get(partitionId); + return partitionMessages != null && partitionMessages.size() != 0; } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java index 2fbc35c..4c363f3 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java @@ -178,7 +178,9 @@ public class IntByteArrayMessageStore<M extends Writable> @Override public boolean hasMessagesForPartition(int partitionId) { - return !map.get(partitionId).isEmpty(); + Int2ObjectOpenHashMap<DataInputOutput> partitionMessages = + map.get(partitionId); + return partitionMessages != null && !partitionMessages.isEmpty(); } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java index 3186224..280f5b9 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java @@ -139,7 +139,8 @@ public class IntFloatMessageStore @Override public boolean hasMessagesForPartition(int partitionId) { - return !map.get(partitionId).isEmpty(); + Int2FloatOpenHashMap partitionMessages = map.get(partitionId); + return partitionMessages != null && !partitionMessages.isEmpty(); } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java index 6278c16..d8a3fde 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java @@ -140,7 +140,8 @@ public class LongDoubleMessageStore @Override public boolean hasMessagesForPartition(int partitionId) { - return !map.get(partitionId).isEmpty(); + Long2DoubleOpenHashMap partitionMessages = map.get(partitionId); + return partitionMessages != null && !partitionMessages.isEmpty(); } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java index d8e5246..a0c977e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java @@ -102,7 +102,8 @@ public abstract class LongAbstractMessageStore<M extends Writable, T> @Override public boolean hasMessagesForPartition(int partitionId) { - return !map.get(partitionId).isEmpty(); + Long2ObjectOpenHashMap<T> partitionMessages = map.get(partitionId); + return partitionMessages != null && !partitionMessages.isEmpty(); } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java index e9b072a..1cd1bd6 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java @@ -210,7 +210,7 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable, Partition<I, V, E> partition) { final int partitionId = partition.getId(); MessageStore<I, Writable> messageStore = - serverData.getPartitionStore().getCurrentMessageStore(); + serverData.getCurrentMessageStore(); ByteArrayVertexIdMessages<I, Writable> vertexIdMessages = new ByteArrayVertexIdMessages<I, Writable>( configuration.createOutgoingMessageValueFactory()); http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java index ab66aa3..b59d0cf 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java @@ -86,8 +86,8 @@ public class SendPartitionCurrentMessagesRequest<I extends WritableComparable, @Override public void doRequest(ServerData<I, V, E> serverData) { try { - serverData.getPartitionStore() - .addPartitionCurrentMessages(partitionId, vertexIdMessageMap); + serverData.<M>getCurrentMessageStore().addPartitionMessages(partitionId, + vertexIdMessageMap); } catch (IOException e) { throw new RuntimeException("doRequest: Got IOException ", e); } http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java index aeda197..00cf6ef 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java @@ -69,7 +69,7 @@ public class SendWorkerEdgesRequest<I extends WritableComparable, iterator = partitionVertexData.getIterator(); while (iterator.hasNext()) { iterator.next(); - serverData.getPartitionStore() + serverData.getEdgeStore() .addPartitionEdges(iterator.getCurrentFirst(), iterator.getCurrentSecond()); } http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java index e9be327..6953998 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java @@ -71,8 +71,8 @@ public class SendWorkerMessagesRequest<I extends WritableComparable, while (iterator.hasNext()) { iterator.next(); try { - serverData.getPartitionStore(). - addPartitionIncomingMessages(iterator.getCurrentFirst(), + serverData.getIncomingMessageStore(). + addPartitionMessages(iterator.getCurrentFirst(), iterator.getCurrentSecond()); } catch (IOException e) { throw new RuntimeException("doRequest: Got IOException ", e); http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java index aeb1b1d..f8d0473 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java @@ -27,6 +27,7 @@ import java.util.Map.Entry; import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.comm.ServerData; +import org.apache.giraph.comm.messages.MessageStore; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.partition.PartitionOwner; import org.apache.giraph.utils.ByteArrayOneMessageToManyIds; @@ -93,11 +94,10 @@ public class SendWorkerOneMessageToManyRequest<I extends WritableComparable, @Override public void doRequest(ServerData serverData) { try { - if (serverData.getPartitionStore().getIncomingMessageStore() - .isPointerListEncoding()) { + MessageStore<I, M> messageStore = serverData.getIncomingMessageStore(); + if (messageStore.isPointerListEncoding()) { // if message store is pointer list based then send data as is - serverData.getPartitionStore() - .addPartitionIncomingMessages(-1, oneMessageToManyIds); + messageStore.addPartitionMessages(-1, oneMessageToManyIds); } else { // else split the data per partition and send individually CentralizedServiceWorker<I, ?, ?> serviceWorker = serverData.getServiceWorker(); @@ -144,9 +144,8 @@ public class SendWorkerOneMessageToManyRequest<I extends WritableComparable, for (Entry<Integer, ByteArrayVertexIdMessages> idMsgs : partitionIdMsgs.entrySet()) { if (!idMsgs.getValue().isEmpty()) { - serverData.getPartitionStore() - .addPartitionIncomingMessages(idMsgs.getKey(), - idMsgs.getValue()); + serverData.getIncomingMessageStore().addPartitionMessages( + idMsgs.getKey(), idMsgs.getValue()); } } } http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index 8ad3767..4787d37 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -71,8 +71,6 @@ import org.apache.giraph.mapping.translate.TranslateEdge; import org.apache.giraph.master.DefaultMasterCompute; import org.apache.giraph.master.MasterCompute; import org.apache.giraph.master.MasterObserver; -import org.apache.giraph.ooc.JVMMemoryEstimator; -import org.apache.giraph.ooc.MemoryEstimator; import org.apache.giraph.partition.GraphPartitionerFactory; import org.apache.giraph.partition.HashPartitionerFactory; import org.apache.giraph.partition.Partition; @@ -976,15 +974,6 @@ public interface GiraphConstants { new BooleanConfOption("giraph.useOutOfCoreGraph", false, "Enable out-of-core graph."); - /** - * Memory estimator class used in adaptive out-of-core mechanism for deciding - * when data should go to disk. - */ - ClassConfOption<MemoryEstimator> OUT_OF_CORE_MEM_ESTIMATOR = - ClassConfOption.create("giraph.outOfCoreMemoryEstimator", - JVMMemoryEstimator.class, MemoryEstimator.class, - "Memory estimator class used for out-of-core decisions"); - /** Number of threads participating in swapping graph/messages to disk. */ IntConfOption NUM_OOC_THREADS = new IntConfOption("giraph.numOutOfCoreThreads", 1, http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java index 9609047..0f3d668 100644 --- a/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java @@ -42,6 +42,8 @@ import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; +import static com.google.common.base.Preconditions.checkState; + /** * Basic implementation of edges store, extended this to easily define simple * and primitive edge stores @@ -157,7 +159,7 @@ public abstract class AbstractEdgeStore<I extends WritableComparable, getPartitionEdgesIterator(Map<K, OutEdges<I, E>> partitionEdges); @Override - public boolean hasPartitionEdges(int partitionId) { + public boolean hasEdgesForPartition(int partitionId) { return transientEdges.containsKey(partitionId); } @@ -165,21 +167,21 @@ public abstract class AbstractEdgeStore<I extends WritableComparable, public void writePartitionEdgeStore(int partitionId, DataOutput output) throws IOException { Map<K, OutEdges<I, E>> edges = transientEdges.remove(partitionId); - output.writeInt(edges.size()); - for (Map.Entry<K, OutEdges<I, E>> edge : edges.entrySet()) { - writeVertexKey(edge.getKey(), output); - edge.getValue().write(output); + if (edges != null) { + output.writeInt(edges.size()); + for (Map.Entry<K, OutEdges<I, E>> edge : edges.entrySet()) { + writeVertexKey(edge.getKey(), output); + edge.getValue().write(output); + } } } @Override public void readPartitionEdgeStore(int partitionId, DataInput input) throws IOException { - if (transientEdges.containsKey(partitionId)) { - throw new IllegalStateException("readPartitionEdgeStore: reading a " + - "partition that is already there in the partition store " + - "(impossible)"); - } + checkState(!transientEdges.containsKey(partitionId), + "readPartitionEdgeStore: reading a partition that is already there in" + + " the partition store (impossible)"); Map<K, OutEdges<I, E>> partitionEdges = getPartitionEdges(partitionId); int numEntries = input.readInt(); for (int i = 0; i < numEntries; ++i) { http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java index 1c9d85f..f485042 100644 --- a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java @@ -46,21 +46,12 @@ public interface EdgeStore<I extends WritableComparable, /** * Move all edges from temporary storage to their source vertices. - * Note: this method is not thread-safe. + * Note: this method is not thread-safe and is called once all vertices and + * edges are read in INPUT_SUPERSTEP. */ void moveEdgesToVertices(); /** - * Whether the store contains edges for the given partition. - * Note: This method is thread-safe - * - * @param partitionId Partition id under query - * @return true if the store has any edge for the given partition, false - * otherwise - */ - boolean hasPartitionEdges(int partitionId); - - /** * Deserialize the edges of a given partition, and removes the associated data * from the store. * Note: This method is not thread-safe (i.e. should not be called for the @@ -84,4 +75,12 @@ public interface EdgeStore<I extends WritableComparable, */ void readPartitionEdgeStore(int partitionId, DataInput input) throws IOException; + + /** + * Check if edge store has edge for a given partition + * + * @param partitionId Id of partition + * @return True iff edge store have messages for the given partition + */ + boolean hasEdgesForPartition(int partitionId); } http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java index 3c09957..62a87de 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java @@ -332,8 +332,7 @@ end[PURE_YARN]*/ prepareForSuperstep(graphState); context.progress(); MessageStore<I, Writable> messageStore = - serviceWorker.getServerData().getPartitionStore() - .getCurrentMessageStore(); + serviceWorker.getServerData().getCurrentMessageStore(); int numPartitions = serviceWorker.getPartitionStore().getNumPartitions(); int numThreads = Math.min(numComputeThreads, numPartitions); if (LOG.isInfoEnabled()) { http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/AdaptiveOutOfCoreEngine.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/AdaptiveOutOfCoreEngine.java b/giraph-core/src/main/java/org/apache/giraph/ooc/AdaptiveOutOfCoreEngine.java deleted file mode 100644 index d5b0e20..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/AdaptiveOutOfCoreEngine.java +++ /dev/null @@ -1,284 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.ooc; - -import com.google.common.collect.Lists; -import org.apache.giraph.bsp.CentralizedServiceWorker; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.utils.CallableFactory; -import org.apache.giraph.utils.LogStacktraceCallable; -import org.apache.giraph.utils.ThreadUtils; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.log4j.Logger; - -import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.BrokenBarrierException; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Adaptive out-of-core mechanism. This mechanism spawns two types of threads: - * 1) check-memory thread, which periodically monitors the amount of available - * memory and decides whether data should go on disk. This threads is - * basically the brain behind the out-of-core mechanism, commands - * "out-of-core processor threads" (type 2 thread below) to move - * appropriate data to disk, - * 2) out-of-core processor threads. This is a team of threads responsible for - * offloading appropriate data to disk. "check-memory thread" decides on - * which data should go to disk, and "out-of-core processor threads" do the - * offloading. - * - * @param <I> Vertex id - * @param <V> Vertex value - * @param <E> Edge data - */ -public class AdaptiveOutOfCoreEngine<I extends WritableComparable, - V extends Writable, E extends Writable> implements - OutOfCoreEngine<I, V, E> { - /** Class logger. */ - private static final Logger LOG = - Logger.getLogger(AdaptiveOutOfCoreEngine.class); - - // ---- Synchronization Variables ---- - /** Barrier to coordinate check-memory and OOC-processing threads */ - private final CyclicBarrier gate; - /** - * Signal to determine whether OOC processing threads are done processing OOC - * requests - */ - private final CyclicBarrier doneOocSignal; - /** Signal to determine whether the computation is terminated */ - private final CountDownLatch doneCompute; - /** Finisher signal to OOC processing threads */ - private volatile boolean done; - - // ---- OOC Commands ---- - /** - * List of partitions that are on disk, and their loaded *vertices* during - * INPUT_SUPERSTEP are ready to be flushed to disk - */ - private final BlockingQueue<Integer> partitionsWithInputVertices; - /** - * List of partitions that are on disk, and their loaded *edges* during - * INPUT_SUPERSTEP are ready to be flushed to disk - */ - private final BlockingQueue<Integer> partitionsWithInputEdges; - /** - * List of partitions that are on disk, and their message buffers (either - * messages for current superstep, or incoming messages for next superstep) - * are ready to be flushed to disk - */ - private final BlockingQueue<Integer> partitionsWithPendingMessages; - /** Number of partitions to be written to disk */ - private final AtomicInteger numPartitionsToSpill; - - /** Executor service for check memory thread */ - private ExecutorService checkMemoryExecutor; - /** Executor service for out-of-core processor threads */ - private ExecutorService outOfCoreProcessorExecutor; - - /** Configuration */ - private ImmutableClassesGiraphConfiguration<I, V, E> conf; - /** Worker */ - private final CentralizedServiceWorker<I, V, E> serviceWorker; - - /** Cached value for number of out-of-core threads specified by user */ - private int numOocThreads; - - /** Result of check-memory thread (to be checked for graceful termination) */ - private Future<Void> checkMemoryResult; - /** - * Results of out-of-core processor threads (to be checked for graceful - * termination) - */ - private List<Future<Void>> oocProcessorResults; - - /** - * Creates an instance of adaptive mechanism - * @param conf Configuration - * @param serviceWorker Worker service - */ - public AdaptiveOutOfCoreEngine(ImmutableClassesGiraphConfiguration conf, - CentralizedServiceWorker<I, V, E> serviceWorker) { - this.conf = conf; - this.serviceWorker = serviceWorker; - - this.numOocThreads = conf.getNumOocThreads(); - this.gate = new CyclicBarrier(numOocThreads + 1); - this.doneOocSignal = new CyclicBarrier(numOocThreads + 1); - this.doneCompute = new CountDownLatch(1); - this.done = false; - this.partitionsWithInputVertices = new ArrayBlockingQueue<Integer>(100); - this.partitionsWithInputEdges = new ArrayBlockingQueue<Integer>(100); - this.partitionsWithPendingMessages = new ArrayBlockingQueue<Integer>(100); - this.numPartitionsToSpill = new AtomicInteger(0); - } - - @Override - public void initialize() { - if (LOG.isInfoEnabled()) { - LOG.info("initialize: initializing out-of-core engine"); - } - CallableFactory<Void> checkMemoryCallableFactory = - new CallableFactory<Void>() { - @Override - public Callable<Void> newCallable(int callableId) { - return new CheckMemoryCallable<I, V, E>( - AdaptiveOutOfCoreEngine.this, conf, serviceWorker); - } - }; - checkMemoryExecutor = Executors.newSingleThreadExecutor( - ThreadUtils.createThreadFactory("check-memory")); - checkMemoryResult = checkMemoryExecutor.submit(new LogStacktraceCallable<>( - checkMemoryCallableFactory.newCallable(0))); - - CallableFactory<Void> outOfCoreProcessorCallableFactory = - new CallableFactory<Void>() { - @Override - public Callable<Void> newCallable(int callableId) { - return new OutOfCoreProcessorCallable<I, V, E>( - AdaptiveOutOfCoreEngine.this, serviceWorker); - } - }; - outOfCoreProcessorExecutor = - Executors.newFixedThreadPool(numOocThreads, - ThreadUtils.createThreadFactory("ooc-%d")); - oocProcessorResults = Lists.newArrayListWithCapacity(numOocThreads); - for (int i = 0; i < numOocThreads; ++i) { - Future<Void> future = outOfCoreProcessorExecutor.submit( - new LogStacktraceCallable<>( - outOfCoreProcessorCallableFactory.newCallable(i))); - oocProcessorResults.add(future); - } - } - - @Override - public void shutdown() { - doneCompute.countDown(); - checkMemoryExecutor.shutdown(); - if (checkMemoryResult.isCancelled()) { - throw new IllegalStateException( - "shutdown: memory check thread did not " + "terminate gracefully!"); - } - outOfCoreProcessorExecutor.shutdown(); - for (int i = 0; i < numOocThreads; ++i) { - if (oocProcessorResults.get(i).isCancelled()) { - throw new IllegalStateException("shutdown: out-of-core processor " + - "thread " + i + " did not terminate gracefully."); - } - } - } - - /** - * @return the latch that signals whether the whole computation is done - */ - public CountDownLatch getDoneCompute() { - return doneCompute; - } - - /** - * @return whether the computation is done - */ - public boolean isDone() { - return done; - } - - /** - * @return list of partitions that have large enough buffers of vertices read - * in INPUT_SUPERSTEP. - */ - public BlockingQueue<Integer> getPartitionsWithInputVertices() { - return partitionsWithInputVertices; - } - - /** - * @return list of partitions that have large enough buffers of edges read - * in INPUT_SUPERSTEP. - */ - public BlockingQueue<Integer> getPartitionsWithInputEdges() { - return partitionsWithInputEdges; - } - - /** - * @return list of partitions that have large enough message buffers. - */ - public BlockingQueue<Integer> getPartitionsWithPendingMessages() { - return partitionsWithPendingMessages; - } - - /** - * @return number of partitions to spill to disk - */ - public AtomicInteger getNumPartitionsToSpill() { - return numPartitionsToSpill; - } - - /** - * Wait on gate with which OOC processor threads are notified to execute - * commands provided by brain (memory-check thread). - * - * @throws BrokenBarrierException - * @throws InterruptedException - */ - public void waitOnGate() throws BrokenBarrierException, InterruptedException { - gate.await(); - } - - /** - * Reset the gate for reuse. - */ - public void resetGate() { - gate.reset(); - } - - /** - * Wait on signal from all OOC processor threads that the offloading of data - * is complete. - * - * @throws BrokenBarrierException - * @throws InterruptedException - */ - public void waitOnOocSignal() - throws BrokenBarrierException, InterruptedException { - doneOocSignal.await(); - } - - /** - * Reset the completion signal of OOC processor threads for reuse. - */ - public void resetOocSignal() { - doneOocSignal.reset(); - } - - /** - * Set the computation as done (i.e. setting the state that determines the - * whole computation is done). - */ - public void setDone() { - done = true; - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/CheckMemoryCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/CheckMemoryCallable.java b/giraph-core/src/main/java/org/apache/giraph/ooc/CheckMemoryCallable.java deleted file mode 100644 index 2b2c990..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/CheckMemoryCallable.java +++ /dev/null @@ -1,478 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.ooc; - -import org.apache.giraph.bsp.BspService; -import org.apache.giraph.bsp.CentralizedServiceWorker; -import org.apache.giraph.conf.FloatConfOption; -import org.apache.giraph.conf.GiraphConstants; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.conf.IntConfOption; -import org.apache.giraph.utils.PairList; -import org.apache.giraph.utils.ReflectionUtils; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.log4j.Logger; - -import java.util.Stack; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.BrokenBarrierException; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Adaptive out-of-core mechanism brain. This class provides one thread per - * worker that periodically checks the free memory on the worker and compares it - * with total amount of memory given to that worker to run the job. The period - * at which the thread checks for the memory is specified by the user. Also, - * user can specify the fraction of memory where anytime free memory is less - * than that fraction of total memory, actions would be taken to free up space - * in memory (this fraction is called LOW_FREE_MEMORY_FRACTION). Also, user can - * specify another fraction of available memory where memory pressure is fair - * and some of the data on disk (if there is any) can be brought back to memory - * again (this fraction is called FAIR_FREE_MEMORY_FRACTION). - * - * In the adaptive out-of-core mechanism, if amount of free memory becomes less - * than LOW_FREE_MEMORY_FRACTION, some data are being considered as potentials - * to transfer to disk. These data can be in the following categories: - * 1) Vertex buffers read in INPUT_SUPERSTEP. These are vertex input splits - * read for a partition that is out-of-core and PartitionStore holds these - * vertex buffers in in-memory buffers (and postpone their merge with the - * actual partition until the partition is loaded back in memory). - * 2) Edge buffers read in INPUT_SUPERSTEP. These are similar buffers to - * vertex buffers, but they keep edge data in INPUT_SUPERSTEP. - * 3) Partitions. - * - * This brain prefers the first two categories in INPUT_SUPERSTEP as long as - * size of buffers are large enough that it is worth writing them to disk. In - * case where brain decides on spilling partitions to disk, the brain decides - * only on the "number of partitions" to spill to disk. It is "out-of-core - * processor threads" responsibility to find that many partitions to spill to - * disk. The number of partitions to spill is a fraction of number of partitions - * currently in memory. It is recommended that this fraction be equal to - * subtraction of LOW_FREE_MEMORY_FRACTION from FAIR_FREE_MEMORY_FRACTION. Here - * is an example to clarify on this recommendation. Assume - * LOW_FREE_MEMORY_FRACTION is 5% and FAIR_FREE_MEMORY_FRACTION is 15%. Also - * assume that the partitions are similar in their memory footprint (which is a - * valid assumption for most of the partitioning techniques). If free memory is - * a bit less than 5% of total available memory, if we offload 10% - * (15% - 5% = 10%), then the amount of free memory will increase to a bit less - * than 15% of total available memory. - * - * @param <I> Vertex id - * @param <V> Vertex value - * @param <E> Edge value - */ -public class CheckMemoryCallable<I extends WritableComparable, - V extends Writable, E extends Writable> implements Callable<Void> { - /** - * Lowest free memory fraction to start doing necessary actions to go - * out-of-core. - */ - public static final FloatConfOption LOW_FREE_MEMORY_FRACTION = - new FloatConfOption("giraph.lowFreeMemoryFraction", 0.1f, - "If free memory fraction goes below this value, GC is called " + - "manually and necessary actions are taken if we have to go " + - "out-of-core"); - /** - * Expected memory fraction to achieve after detecting that the job is running - * low in memory. Basically, this memory fraction is the target to achieve - * once we decide to offload data on disk. - */ - public static final FloatConfOption MID_FREE_MEMORY_FRACTION = - new FloatConfOption("giraph.midFreeMemoryFraction", 0.15f, - "Once out-of-core mechanism decides to offload data on disk, it " + - "offloads data on disk until free memory fraction reaches this " + - "fraction."); - /** - * Memory fraction at which the job gets the best performance considering the - * choice of GC strategy. It means, if the amount of free memory is more than - * this fraction we will not see severe amount of GC calls. - */ - public static final FloatConfOption FAIR_FREE_MEMORY_FRACTION = - new FloatConfOption("giraph.fairFreeMemoryFraction", 0.3f, - "The fraction of free memory at which the job shows the best GC " + - "performance. This fraction might be dependent on GC strategy " + - "used in running the job, but generally 0.3 is a reasonable " + - "fraction for most strategies."); - /** - * Memory fraction at which the job has enough space so we can back off from - * the last out-of-core decision, i.e. lazily bringing the last bunch of data - * spilled to disk. - */ - public static final FloatConfOption HIGH_FREE_MEMORY_FRACTION = - new FloatConfOption("giraph.highFreeMemoryFraction", 0.4f, - "Once free memory reaches at this fraction, last out-of-core " + - "decision is lazily rolled back, i.e. we back off from " + - "out-of-core."); - /** Time interval at which checking memory is done periodically. */ - public static final IntConfOption CHECK_MEMORY_INTERVAL = - new IntConfOption("giraph.checkMemoryInterval", 5000, - "Time interval (in milliseconds) at which checking memory is done" + - " to decide if there should be any out-of-core action."); - /** Coefficient by which the number of partitions in memory changes. */ - public static final FloatConfOption OOC_GRAPH_MODIFICATION_COEFFICIENT = - new FloatConfOption("giraph.graphPartitionModificationCoefficient", 0.3f, - "If we decide to go out-of-core or back-off from out-of-core, this " + - "is the multiplier by which the number of in-memory partitions" + - "will change."); - - /** Class logger */ - private static final Logger LOG = Logger.getLogger(CheckMemoryCallable.class); - - /** Worker */ - private final CentralizedServiceWorker<I, V, E> serviceWorker; - /** Partition store */ - private final DiskBackedPartitionStore<I, V, E> partitionStore; - - // ---- Cached Config Values ---- - /** Cached value of LOW_FREE_MEMORY_FRACTION */ - private float lowFreeMemoryFraction; - /** Cached value for MID_FREE_MEMORY_FRACTION */ - private float midFreeMemoryFraction; - /** Cached value of FAIR_FREE_MEMORY_FRACTION */ - private float fairFreeMemoryFraction; - /** Cached value for HIGH_FREE_MEMORY_FRACTION */ - private float highFreeMemoryFraction; - /** Cached value of CHECK_MEMORY_INTERVAL */ - private int checkInterval; - /** Cached value for OOC_GRAPH_MODIFICATION_COEFFICIENT */ - private float modificationCoefficient; - - /** List of counts of number of partitions every time we shrink the store */ - private Stack<Integer> oocPartitionCounts; - /** Memory estimator instance */ - private final MemoryEstimator memoryEstimator; - /** Adaptive out-of-core engine */ - private final AdaptiveOutOfCoreEngine<I, V, E> oocEngine; - - /** - * Constructor for check-memory thread. - * - * @param oocEngine out-of-core engine - * @param conf job configuration - * @param serviceWorker worker service - */ - public CheckMemoryCallable(AdaptiveOutOfCoreEngine<I, V, E> oocEngine, - ImmutableClassesGiraphConfiguration<I, V, E> conf, - CentralizedServiceWorker<I, V, E> serviceWorker) { - this.oocEngine = oocEngine; - this.serviceWorker = serviceWorker; - this.partitionStore = - (DiskBackedPartitionStore<I, V, E>) serviceWorker.getPartitionStore(); - - this.oocPartitionCounts = new Stack<>(); - - this.lowFreeMemoryFraction = LOW_FREE_MEMORY_FRACTION.get(conf); - this.midFreeMemoryFraction = MID_FREE_MEMORY_FRACTION.get(conf); - this.fairFreeMemoryFraction = FAIR_FREE_MEMORY_FRACTION.get(conf); - this.highFreeMemoryFraction = HIGH_FREE_MEMORY_FRACTION.get(conf); - this.checkInterval = CHECK_MEMORY_INTERVAL.get(conf); - this.modificationCoefficient = OOC_GRAPH_MODIFICATION_COEFFICIENT.get(conf); - - memoryEstimator = ReflectionUtils - .newInstance(GiraphConstants.OUT_OF_CORE_MEM_ESTIMATOR.get(conf)); - } - - /** - * Checks whether the available free memory is enough for an efficient - * execution. If memory is limited, offload partitions to disk. - * Also, if available memory is more than a threshold, loads partitions from - * disk (if there is any) to memory. - */ - @Override - @edu.umd.cs.findbugs.annotations.SuppressWarnings("DM_GC") - public Void call() { - if (LOG.isInfoEnabled()) { - LOG.info("call: check-memory thread started."); - } - memoryEstimator.initialize(serviceWorker); - CountDownLatch doneCompute = oocEngine.getDoneCompute(); - while (doneCompute.getCount() != 0) { - double maxMemory = memoryEstimator.maxMemoryMB(); - double freeMemory = memoryEstimator.freeMemoryMB(); - boolean gcDone = false; - if (freeMemory < lowFreeMemoryFraction * maxMemory) { - // This is typically a bad scenario where previous GCs were not - // successful to free up enough memory. If we keep staying in this - // situation, usually, either the computation slows down dramatically, - // or the computation throws OOM error. So, we do GC manually, and - // make sure that out-of-core is the solution to get out of this - // situation. - if (LOG.isInfoEnabled()) { - LOG.info("call: Memory is very limited now. Calling GC manually. " + - String.format("freeMemory = %.2fMB", freeMemory)); - } - long gcStartTime = System.currentTimeMillis(); - System.gc(); - gcDone = true; - freeMemory = memoryEstimator.freeMemoryMB(); - if (LOG.isInfoEnabled()) { - LOG.info("call: GC is done. " + String - .format("GC time = %.2f sec, and freeMemory = %.2fMB", - (System.currentTimeMillis() - gcStartTime) / 1000.0, - freeMemory)); - } - } - // If we have enough memory, we roll back the latest shrink in number of - // partition slots. - // If we do not have enough memory, but we are not in a bad scenario - // either, we gradually increase the number of partition slots in memory. - // If we are low in free memory, we first push unnecessary data to disk - // and then push some partitions to disk if necessary. - int numInMemory = partitionStore.getNumPartitionInMemory(); - int maxInMemory = partitionStore.getNumPartitionSlots(); - int numInTotal = partitionStore.getNumPartitions(); - if (freeMemory > highFreeMemoryFraction * maxMemory) { - if (numInMemory >= maxInMemory && !oocPartitionCounts.isEmpty()) { - partitionStore.increasePartitionSlots(oocPartitionCounts.pop()); - } - } else if (freeMemory > fairFreeMemoryFraction * maxMemory) { - // Only gradually increase the number of partition slots if all slots - // are already used, and we have things out-of-core - if (!oocPartitionCounts.isEmpty() || maxInMemory < numInTotal) { - if (numInMemory >= maxInMemory) { - partitionStore.increasePartitionSlots(1); - if (!oocPartitionCounts.isEmpty()) { - int num = oocPartitionCounts.pop(); - if (num > 1) { - oocPartitionCounts.push(num - 1); - } - } - } - } - } else if (gcDone && freeMemory < midFreeMemoryFraction * maxMemory) { - BlockingQueue<Integer> partitionsWithInputVertices = - oocEngine.getPartitionsWithInputVertices(); - BlockingQueue<Integer> partitionsWithInputEdges = - oocEngine.getPartitionsWithInputEdges(); - BlockingQueue<Integer> partitionsWithPendingMessages = - oocEngine.getPartitionsWithPendingMessages(); - AtomicInteger numPartitionsToSpill = - oocEngine.getNumPartitionsToSpill(); - while (freeMemory < midFreeMemoryFraction * maxMemory) { - // Offload input vertex buffer of OOC partitions if we are in - // INPUT_SUPERSTEP - if (serviceWorker.getSuperstep() == BspService.INPUT_SUPERSTEP) { - // List of pairs (partitionId, approximate memory footprint of - // vertex buffers of that partition). - PairList<Integer, Integer> pairs = - partitionStore.getOocPartitionIdsWithPendingInputVertices(); - freeMemory -= createCommands(pairs, partitionsWithInputVertices); - } - - // Offload edge store of OOC partitions if we are in INPUT_SUPERSTEP - if (freeMemory < midFreeMemoryFraction * maxMemory && - serviceWorker.getSuperstep() == BspService.INPUT_SUPERSTEP) { - PairList<Integer, Integer> pairs = - partitionStore.getOocPartitionIdsWithPendingInputEdges(); - freeMemory -= createCommands(pairs, partitionsWithInputEdges); - } - - // Offload message buffers of OOC partitions if we are still low in - // free memory - if (freeMemory < midFreeMemoryFraction * maxMemory) { - PairList<Integer, Integer> pairs = - partitionStore.getOocPartitionIdsWithPendingMessages(); - freeMemory -= createCommands(pairs, partitionsWithPendingMessages); - } - - // Offload partitions if we are still low in free memory - if (freeMemory < midFreeMemoryFraction * maxMemory) { - numPartitionsToSpill - .set(getNextOocPartitionCount(freeMemory, maxMemory)); - } - - if (!partitionsWithInputVertices.isEmpty() || - !partitionsWithInputEdges.isEmpty() || - !partitionsWithPendingMessages.isEmpty() || - numPartitionsToSpill.get() != 0) { - if (LOG.isInfoEnabled()) { - LOG.info("call: signal out-of-core processor threads to start " + - "offloading. These threads will spill vertex buffers of " + - partitionsWithInputVertices.size() + " partitions, edge " + - "buffers of " + partitionsWithInputEdges.size() + - " partitions, pending message buffers of " + - partitionsWithPendingMessages.size() + " partitions, and " + - numPartitionsToSpill.get() + " whole partitions"); - } - // Opening the gate for OOC processing threads to start spilling - // data on disk - try { - oocEngine.waitOnGate(); - } catch (InterruptedException e) { - throw new IllegalStateException("call: Caught " + - "InterruptedException while opening the gate for OOC " + - "processing threads"); - } catch (BrokenBarrierException e) { - throw new IllegalStateException("call: Caught " + - "BrokenBarrierException while opening the gate for OOC " + - "processing threads"); - } - oocEngine.resetGate(); - - if (LOG.isInfoEnabled()) { - LOG.info("call: waiting on OOC processors to finish offloading " + - "data to disk"); - } - // Wait until all OOC processing threads are done swapping data to - // disk - try { - oocEngine.waitOnOocSignal(); - } catch (InterruptedException e) { - throw new IllegalStateException("call: Caught " + - "InterruptedException. Looks like memory check thread is " + - "interrupted while waiting on OOC processing threads."); - } catch (BrokenBarrierException e) { - throw new IllegalStateException("call: Caught " + - "BrokenBarrierException. Looks like some OOC processing " + - "threads broke while writing data on disk."); - } - oocEngine.resetOocSignal(); - } - - gcDone = false; - long gcStartTime = 0; - if (freeMemory < midFreeMemoryFraction * maxMemory) { - // Calling GC manually to actually free up the memory for data that - // is offloaded to disk - if (LOG.isInfoEnabled()) { - LOG.info("call: calling GC manually to free up space for " + - "recently offloaded data."); - } - gcStartTime = System.currentTimeMillis(); - System.gc(); - gcDone = true; - } - freeMemory = memoryEstimator.freeMemoryMB(); - if (LOG.isInfoEnabled()) { - LOG.info("call: " + - (gcDone ? - ("GC is done. " + String.format("GC time = %.2f sec. ", - (System.currentTimeMillis() - gcStartTime) / 1000.0)) : - "") + - "Finished offloading data to disk. " + - String.format("freeMemory = %.2fMB", freeMemory)); - } - } - } - - // Either wait for the computation to be done, or the time interval passes - try { - doneCompute.await(checkInterval, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - throw new IllegalStateException("call: Caught InterruptedException " + - "while waiting for computation to be done and/or " + checkInterval + - "milliseconds passes."); - } - } - - // Setting 'done' before the gate here and checking 'done' in OOC processing - // threads after the gate, guarantees that OOC processing threads see the - // new value of done and terminate gracefully. - oocEngine.setDone(); - try { - oocEngine.waitOnGate(); - } catch (InterruptedException e) { - throw new IllegalStateException("call: Caught InterruptedException " + - "while waiting for the last time on gate in the current superstep"); - } catch (BrokenBarrierException e) { - throw new IllegalStateException("call: Caught BrokenBarrierException " + - "while waiting for the last time on gate in the current superstep"); - } - return null; - } - - /** - * Returns the number of partitions that should go out-of-core at this point. - * - * @return number of partitions that should go out-of-core - * @param freeMemory amount of free memory (in MB) - * @param maxMemory amount of max memory (in MB) - */ - private int getNextOocPartitionCount(double freeMemory, double maxMemory) { - int numSlots = partitionStore.getNumPartitionSlots(); - if (numSlots == Integer.MAX_VALUE) { - numSlots = partitionStore.getNumPartitions(); - partitionStore.setNumPartitionSlots(numSlots); - } - - double freeFraction = freeMemory / maxMemory; - double multiplier = Math.min( - // User-specified favorable size to spill to disk - modificationCoefficient, - // Approximate fraction of current data to spill in order to reach the - // fair fraction of free memory - (fairFreeMemoryFraction - freeFraction) / (1 - freeFraction)); - int count = Math.max((int) (numSlots * multiplier), 1); - if (count >= numSlots) { - LOG.warn("getNextOocPartitionCount: Memory capacity is " + - numSlots + " partitions, and OOC mechanism is " + - "trying to put " + count + " partitions to disk. This is not " + - "possible"); - // We should have at least one partition in memory - count = numSlots - 1; - if (count == 0) { - LOG.warn("It seems that size of one partition is too large for the " + - "available memory. Try to run the job with more partitions!"); - } - } - if (count != 0) { - oocPartitionCounts.push(count); - } - return count; - } - - /** - * Generate commands for out-of-core processor threads based on the - * (partitionId, memory foot-print) pairs we have on a particular type of data - * (either vertex buffer, edge buffer, or message buffer). - * - * @param pairs list of pairs (partitionId, estimated memory foot-print that - * is going to be reduced by offloading the particular data of a - * partition) - * @param commands commands to generate for out-of-core processor threads. a - * command is a partition id, for which the appropriate data - * should be flushed to disk. - * @return approximate amount of memory (in MB) that is going to be freed up - * after executing the generated commands - */ - private double createCommands(PairList<Integer, Integer> pairs, - BlockingQueue<Integer> commands) { - double usedMemory = 0; - if (pairs.getSize() != 0) { - PairList<Integer, Integer>.Iterator iterator = pairs.getIterator(); - // Generating commands for out-of-core processor threads to - // offload data as long as command queue has space. - while (iterator.hasNext() && - commands.remainingCapacity() > 0) { - iterator.next(); - commands.add(iterator.getCurrentFirst()); - // Having an approximation on the memory foot-print of data to offload - // helps us to know how much memory is going to become available by - // offloading the data without using internal functions to estimate - // free memory again. - usedMemory += iterator.getCurrentSecond() / 1024.0 / 1024.0; - } - } - return usedMemory; - } -}
