New out-of-core infrastructure (first patch including fixed out-of-core 
mechanism)

Summary: This is a re-design of out-of-core mechanism. The new implementation 
allows for much more intelligent partition scheduling and IO.

Test Plan:
mvn clean verify

Reviewers: maja.kabiljo, sergey.edunov, avery.ching, dionysis.logothetis

Reviewed By: dionysis.logothetis

Differential Revision: https://reviews.facebook.net/D54549


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/fafecee7
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/fafecee7
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/fafecee7

Branch: refs/heads/trunk
Commit: fafecee712bc9b2ce8ef081d8170cdf99c48288b
Parents: c6af3ed
Author: Sergey Edunov <[email protected]>
Authored: Tue Mar 15 10:40:20 2016 -0700
Committer: Sergey Edunov <[email protected]>
Committed: Tue Mar 15 10:40:20 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/giraph/comm/ServerData.java |  213 +-
 .../giraph/comm/messages/MessageData.java       |   82 -
 .../comm/messages/SimpleMessageStore.java       |    3 +-
 .../out_of_core/DiskBackedMessageStore.java     |    6 +-
 .../primitives/IdByteArrayMessageStore.java     |    4 +-
 .../primitives/IdOneMessagePerVertexStore.java  |    3 +-
 .../primitives/IntByteArrayMessageStore.java    |    4 +-
 .../primitives/IntFloatMessageStore.java        |    3 +-
 .../primitives/LongDoubleMessageStore.java      |    3 +-
 .../long_id/LongAbstractMessageStore.java       |    3 +-
 .../NettyWorkerClientRequestProcessor.java      |    2 +-
 .../SendPartitionCurrentMessagesRequest.java    |    4 +-
 .../comm/requests/SendWorkerEdgesRequest.java   |    2 +-
 .../requests/SendWorkerMessagesRequest.java     |    4 +-
 .../SendWorkerOneMessageToManyRequest.java      |   13 +-
 .../org/apache/giraph/conf/GiraphConstants.java |   11 -
 .../apache/giraph/edge/AbstractEdgeStore.java   |   22 +-
 .../java/org/apache/giraph/edge/EdgeStore.java  |   21 +-
 .../apache/giraph/graph/GraphTaskManager.java   |    3 +-
 .../giraph/ooc/AdaptiveOutOfCoreEngine.java     |  284 ---
 .../apache/giraph/ooc/CheckMemoryCallable.java  |  478 ----
 .../giraph/ooc/DiskBackedPartitionStore.java    | 2149 ------------------
 .../apache/giraph/ooc/FixedOutOfCoreEngine.java |  147 ++
 .../giraph/ooc/FixedOutOfCoreIOScheduler.java   |  211 ++
 .../apache/giraph/ooc/JVMMemoryEstimator.java   |   45 -
 .../org/apache/giraph/ooc/MemoryEstimator.java  |   44 -
 .../org/apache/giraph/ooc/OutOfCoreEngine.java  |  187 +-
 .../apache/giraph/ooc/OutOfCoreIOCallable.java  |   90 +
 .../giraph/ooc/OutOfCoreIOCallableFactory.java  |  184 ++
 .../apache/giraph/ooc/OutOfCoreIOScheduler.java |  105 +
 .../giraph/ooc/OutOfCoreProcessorCallable.java  |  170 --
 .../giraph/ooc/data/DiskBackedEdgeStore.java    |  207 ++
 .../giraph/ooc/data/DiskBackedMessageStore.java |  297 +++
 .../ooc/data/DiskBackedPartitionStore.java      |  469 ++++
 .../giraph/ooc/data/MetaPartitionManager.java   |  947 ++++++++
 .../giraph/ooc/data/OutOfCoreDataManager.java   |  385 ++++
 .../apache/giraph/ooc/data/package-info.java    |   22 +
 .../org/apache/giraph/ooc/io/IOCommand.java     |   65 +
 .../giraph/ooc/io/LoadPartitionIOCommand.java   |   92 +
 .../giraph/ooc/io/StoreDataBufferIOCommand.java |   89 +
 .../ooc/io/StoreIncomingMessageIOCommand.java   |   60 +
 .../giraph/ooc/io/StorePartitionIOCommand.java  |   76 +
 .../org/apache/giraph/ooc/io/WaitIOCommand.java |   58 +
 .../org/apache/giraph/ooc/io/package-info.java  |   21 +
 .../apache/giraph/partition/PartitionData.java  |  116 -
 .../apache/giraph/partition/PartitionStore.java |  206 +-
 .../giraph/partition/SimplePartitionStore.java  |   46 +-
 .../apache/giraph/worker/BspServiceWorker.java  |   16 +-
 .../apache/giraph/comm/RequestFailureTest.java  |    7 +-
 .../org/apache/giraph/comm/RequestTest.java     |   14 +-
 .../giraph/partition/TestPartitionStores.java   |   81 +-
 .../java/org/apache/giraph/TestOutOfCore.java   |   46 +-
 52 files changed, 4103 insertions(+), 3717 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java 
b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index b177446..be34820 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -31,20 +31,32 @@ import com.google.common.collect.Maps;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
 import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.comm.messages.MessageStoreFactory;
+import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.EdgeStore;
+import org.apache.giraph.edge.EdgeStoreFactory;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexMutations;
 import org.apache.giraph.graph.VertexResolver;
-import org.apache.giraph.ooc.DiskBackedPartitionStore;
+import org.apache.giraph.ooc.data.DiskBackedEdgeStore;
+import org.apache.giraph.ooc.data.DiskBackedMessageStore;
+import org.apache.giraph.ooc.data.DiskBackedPartitionStore;
+import org.apache.giraph.ooc.FixedOutOfCoreEngine;
+import org.apache.giraph.ooc.OutOfCoreEngine;
 import org.apache.giraph.partition.Partition;
 import org.apache.giraph.partition.PartitionStore;
 import org.apache.giraph.partition.SimplePartitionStore;
+import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
 
+import static 
org.apache.giraph.conf.GiraphConstants.MESSAGE_STORE_FACTORY_CLASS;
+
 /**
  * Anything that the server stores
  *
@@ -56,12 +68,26 @@ import org.apache.log4j.Logger;
 public class ServerData<I extends WritableComparable,
     V extends Writable, E extends Writable> {
   /** Class logger */
-  private static final Logger LOG =
-      Logger.getLogger(ServerData.class);
+  private static final Logger LOG = Logger.getLogger(ServerData.class);
   /** Configuration */
   private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
   /** Partition store for this worker. */
   private volatile PartitionStore<I, V, E> partitionStore;
+  /** Edge store for this worker. */
+  private final EdgeStore<I, V, E> edgeStore;
+  /** Message store factory */
+  private final MessageStoreFactory<I, Writable, MessageStore<I, Writable>>
+      messageStoreFactory;
+  /**
+   * Message store for incoming messages (messages which will be consumed
+   * in the next super step)
+   */
+  private volatile MessageStore<I, Writable> incomingMessageStore;
+  /**
+   * Message store for current messages (messages which we received in
+   * previous super step and which will be consumed in current super step)
+   */
+  private volatile MessageStore<I, Writable> currentMessageStore;
   /**
    * Map of partition ids to vertex mutations from other workers. These are
    * mutations that should be applied before execution of *current* super step.
@@ -81,7 +107,7 @@ public class ServerData<I extends WritableComparable,
       ConcurrentMap<I, VertexMutations<I, V, E>>>
       partitionMutations = Maps.newConcurrentMap();
   /**
-   * Holds aggregtors which current worker owns from current superstep
+   * Holds aggregators which current worker owns from current superstep
    */
   private final OwnerAggregatorServerData ownerAggregatorData;
   /**
@@ -100,6 +126,8 @@ public class ServerData<I extends WritableComparable,
 
   /** Job context (for progress) */
   private final Mapper<?, ?, ?, ?>.Context context;
+  /** Out-of-core engine */
+  private final OutOfCoreEngine oocEngine;
 
   /**
    * Constructor.
@@ -114,13 +142,32 @@ public class ServerData<I extends WritableComparable,
       Mapper<?, ?, ?, ?>.Context context) {
     this.serviceWorker = service;
     this.conf = conf;
+    this.messageStoreFactory = createMessageStoreFactory();
+    EdgeStoreFactory<I, V, E> edgeStoreFactory = conf.createEdgeStoreFactory();
+    edgeStoreFactory.initialize(service, conf, context);
+    EdgeStore<I, V, E> inMemoryEdgeStore = edgeStoreFactory.newStore();
+    PartitionStore<I, V, E> inMemoryPartitionStore =
+        new SimplePartitionStore<I, V, E>(conf, context);
     if (GiraphConstants.USE_OUT_OF_CORE_GRAPH.get(conf)) {
+      int maxPartitionsInMemory =
+          GiraphConstants.MAX_PARTITIONS_IN_MEMORY.get(conf);
+      if (maxPartitionsInMemory == 0) {
+        throw new IllegalStateException("ServerData: Adaptive " +
+            "out-of-core engine is not supported yet! Number of partitions in" 
+
+            " memory should be greater than 0.");
+      } else {
+        oocEngine = new FixedOutOfCoreEngine(conf, service,
+            maxPartitionsInMemory);
+      }
       partitionStore =
-          new DiskBackedPartitionStore<I, V, E>(conf, context,
-              getServiceWorker());
+          new DiskBackedPartitionStore<I, V, E>(inMemoryPartitionStore,
+              conf, context, service, oocEngine);
+      edgeStore =
+          new DiskBackedEdgeStore<I, V, E>(inMemoryEdgeStore, conf, oocEngine);
     } else {
-      partitionStore =
-          new SimplePartitionStore<I, V, E>(conf, context, getServiceWorker());
+      partitionStore = inMemoryPartitionStore;
+      edgeStore = inMemoryEdgeStore;
+      oocEngine = null;
     }
     ownerAggregatorData = new OwnerAggregatorServerData(context);
     allAggregatorData = new AllAggregatorServerData(context, conf);
@@ -128,6 +175,42 @@ public class ServerData<I extends WritableComparable,
   }
 
   /**
+   * Decide which message store should be used for current application,
+   * and create the factory for that store
+   *
+   * @return Message store factory
+   */
+  private MessageStoreFactory<I, Writable, MessageStore<I, Writable>>
+  createMessageStoreFactory() {
+    Class<? extends MessageStoreFactory> messageStoreFactoryClass =
+        MESSAGE_STORE_FACTORY_CLASS.get(conf);
+
+    MessageStoreFactory messageStoreFactoryInstance =
+        ReflectionUtils.newInstance(messageStoreFactoryClass);
+    messageStoreFactoryInstance.initialize(serviceWorker, conf);
+
+    return messageStoreFactoryInstance;
+  }
+
+  /**
+   * Return the out-of-core engine for this worker.
+   *
+   * @return The out-of-core engine
+   */
+  public OutOfCoreEngine getOocEngine() {
+    return oocEngine;
+  }
+
+  /**
+   * Return the edge store for this worker.
+   *
+   * @return The edge store
+   */
+  public EdgeStore<I, V, E> getEdgeStore() {
+    return edgeStore;
+  }
+
+  /**
    * Return the partition store for this worker.
    *
    * @return The partition store
@@ -137,21 +220,103 @@ public class ServerData<I extends WritableComparable,
   }
 
   /**
+   * Get message store for incoming messages (messages which will be consumed
+   * in the next super step)
+   *
+   * @param <M> Message data
+   * @return Incoming message store
+   */
+  public <M extends Writable> MessageStore<I, M> getIncomingMessageStore() {
+    return (MessageStore<I, M>) incomingMessageStore;
+  }
+
+  /**
+   * Get message store for current messages (messages which we received in
+   * previous super step and which will be consumed in current super step)
+   *
+   * @param <M> Message data
+   * @return Current message store
+   */
+  public <M extends Writable> MessageStore<I, M> getCurrentMessageStore() {
+    return (MessageStore<I, M>) currentMessageStore;
+  }
+
+  /**
    * Re-initialize message stores.
    * Discards old values if any.
+   *
    * @throws IOException
    */
   public void resetMessageStores() throws IOException {
-    getPartitionStore().resetMessageStores();
-    currentWorkerToWorkerMessages =
-        Collections.synchronizedList(new ArrayList<Writable>());
-    incomingWorkerToWorkerMessages =
-        Collections.synchronizedList(new ArrayList<Writable>());
+    if (currentMessageStore != null) {
+      currentMessageStore.clearAll();
+      currentMessageStore = null;
+    }
+    if (incomingMessageStore != null) {
+      incomingMessageStore.clearAll();
+      incomingMessageStore = null;
+    }
+    prepareSuperstep();
   }
 
-  /** Prepare for next super step */
+  /** Prepare for next superstep */
   public void prepareSuperstep() {
-    partitionStore.prepareSuperstep();
+    if (currentMessageStore != null) {
+      try {
+        currentMessageStore.clearAll();
+      } catch (IOException e) {
+        throw new IllegalStateException(
+            "Failed to clear previous message store");
+      }
+    }
+
+    MessageStore<I, Writable> nextCurrentMessageStore;
+    MessageStore<I, Writable> nextIncomingMessageStore;
+    MessageStore<I, Writable> messageStore;
+
+    // First create the necessary in-memory message stores. If out-of-core
+    // mechanism is enabled, we wrap the in-memory message stores within
+    // disk-backed messages stores.
+    if (incomingMessageStore != null) {
+      nextCurrentMessageStore = incomingMessageStore;
+    } else {
+      messageStore = messageStoreFactory.newStore(
+          conf.getIncomingMessageClasses());
+      if (oocEngine == null) {
+        nextCurrentMessageStore = messageStore;
+      } else {
+        nextCurrentMessageStore = new DiskBackedMessageStore<>(
+            conf, messageStore,
+            conf.getIncomingMessageClasses().useMessageCombiner(),
+            serviceWorker.getSuperstep());
+      }
+    }
+
+    messageStore = messageStoreFactory.newStore(
+        conf.getOutgoingMessageClasses());
+    if (oocEngine == null) {
+      nextIncomingMessageStore = messageStore;
+    } else {
+      nextIncomingMessageStore = new DiskBackedMessageStore<>(
+          conf, messageStore,
+          conf.getOutgoingMessageClasses().useMessageCombiner(),
+          serviceWorker.getSuperstep() + 1);
+    }
+
+    // If out-of-core engine is enabled, we avoid overlapping of out-of-core
+    // decisions with change of superstep. This avoidance is done to simplify
+    // the design and reduce excessive use of synchronization primitives.
+    if (oocEngine != null) {
+      oocEngine.getSuperstepLock().writeLock().lock();
+    }
+    currentMessageStore = nextCurrentMessageStore;
+    incomingMessageStore = nextIncomingMessageStore;
+    if (oocEngine != null) {
+      oocEngine.getMetaPartitionManager().resetMessages();
+      oocEngine.getSuperstepLock().writeLock().unlock();
+    }
+    currentMessageStore.finalizeStore();
+
     currentWorkerToWorkerMessages = incomingWorkerToWorkerMessages;
     incomingWorkerToWorkerMessages =
         Collections.synchronizedList(new ArrayList<Writable>());
@@ -252,8 +417,7 @@ public class ServerData<I extends WritableComparable,
         VertexMutations<I, V, E> vertexMutations = entry.getValue();
         Vertex<I, V, E> vertex = vertexResolver.resolve(vertexId,
             originalVertex, vertexMutations,
-            getPartitionStore().getCurrentMessageStore()
-                .hasMessagesForVertex(entry.getKey()));
+            getCurrentMessageStore().hasMessagesForVertex(entry.getKey()));
 
         if (LOG.isDebugEnabled()) {
           LOG.debug("resolvePartitionMutations: Resolved vertex index " +
@@ -269,8 +433,7 @@ public class ServerData<I extends WritableComparable,
         } else if (originalVertex != null) {
           partition.removeVertex(vertexId);
           try {
-            getPartitionStore().getCurrentMessageStore()
-                .clearVertexMessages(vertexId);
+            getCurrentMessageStore().clearVertexMessages(vertexId);
           } catch (IOException e) {
             throw new IllegalStateException("resolvePartitionMutations: " +
                 "Caught IOException while clearing messages for a deleted " +
@@ -283,7 +446,7 @@ public class ServerData<I extends WritableComparable,
 
     // Keep track of vertices which are not here in the partition, but have
     // received messages
-    Iterable<I> destinations = getPartitionStore().getCurrentMessageStore().
+    Iterable<I> destinations = getCurrentMessageStore().
         getPartitionDestinationVertices(partitionId);
     if (!Iterables.isEmpty(destinations)) {
       for (I vertexId : destinations) {
@@ -307,4 +470,14 @@ public class ServerData<I extends WritableComparable,
       }
     }
   }
+
+  /**
+   * In case of async message store we have to wait for all messages
+   * to be processed before going into next superstep.
+   */
+  public void waitForComplete() {
+    if (incomingMessageStore instanceof AsyncMessageStoreWrapper) {
+      ((AsyncMessageStoreWrapper) incomingMessageStore).waitToComplete();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageData.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageData.java 
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageData.java
deleted file mode 100644
index f974823..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageData.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages;
-
-import org.apache.giraph.utils.VertexIdMessages;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.io.IOException;
-
-/**
- * Structure that keeps message information.
- *
- * @param <I> Vertex id
- */
-public interface MessageData<I extends WritableComparable> {
-  /**
-   * Get message store for incoming messages (messages which will be consumed
-   * in the next super step)
-   *
-   * @param <M> Message data type
-   * @return Incoming message store
-   */
-  <M extends Writable> MessageStore<I, M> getIncomingMessageStore();
-
-  /**
-   * Get message store for current messages (messages which we received in
-   * previous super step and which will be consumed in current super step)
-   *
-   * @param <M> Message data type
-   * @return Current message store
-   */
-  <M extends Writable> MessageStore<I, M> getCurrentMessageStore();
-
-  /**
-   * Re-initialize message stores.
-   * Discards old values if any.
-
-   * @throws IOException
-   */
-  void resetMessageStores() throws IOException;
-
-  /**
-   * Adds messages for partition to current message store
-   *
-   * @param <M> Message data type
-   * @param partitionId Id of partition
-   * @param messages    Collection of vertex ids and messages we want to add
-   * @throws IOException
-   */
-  <M extends Writable> void addPartitionCurrentMessages(
-      int partitionId, VertexIdMessages<I, M> messages)
-      throws IOException;
-
-  /**
-   * Adds messages for partition to incoming message store
-   *
-   * @param <M> Message data type
-   * @param partitionId Id of partition
-   * @param messages    Collection of vertex ids and messages we want to add
-   * @throws IOException
-   */
-  <M extends Writable> void addPartitionIncomingMessages(
-      int partitionId, VertexIdMessages<I, M> messages)
-      throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
 
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
index a1d3625..054302d 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
@@ -212,7 +212,8 @@ public abstract class SimpleMessageStore<I extends 
WritableComparable,
 
   @Override
   public boolean hasMessagesForPartition(int partitionId) {
-    return !map.get(partitionId).isEmpty();
+    ConcurrentMap<I, T> partitionMessages = map.get(partitionId);
+    return partitionMessages != null && !partitionMessages.isEmpty();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
 
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
index b28d15b..0d7009b 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
@@ -132,8 +132,10 @@ public class DiskBackedMessageStore<I extends 
WritableComparable,
 
   @Override
   public boolean hasMessagesForPartition(int partitionId) {
-    return !Iterables
-        .isEmpty(getMessageStore(partitionId).getDestinationVertices());
+    PartitionDiskBackedMessageStore<I, M> partitionMessages =
+        getMessageStore(partitionId);
+    return partitionMessages != null && !Iterables
+        .isEmpty(partitionMessages.getDestinationVertices());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java
 
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java
index e1e7a3f..2e39857 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdByteArrayMessageStore.java
@@ -187,7 +187,9 @@ public class IdByteArrayMessageStore<I extends 
WritableComparable,
 
   @Override
   public boolean hasMessagesForPartition(int partitionId) {
-    return map.get(partitionId).size() != 0;
+    Basic2ObjectMap<I, DataInputOutput> partitionMessages =
+        map.get(partitionId);
+    return partitionMessages != null && partitionMessages.size() != 0;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
 
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
index b172d24..42fe992 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IdOneMessagePerVertexStore.java
@@ -163,7 +163,8 @@ public class IdOneMessagePerVertexStore<I extends 
WritableComparable,
 
   @Override
   public boolean hasMessagesForPartition(int partitionId) {
-    return map.get(partitionId).size() != 0;
+    Basic2ObjectMap<I, M> partitionMessages = map.get(partitionId);
+    return partitionMessages != null && partitionMessages.size() != 0;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
 
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
index 2fbc35c..4c363f3 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
@@ -178,7 +178,9 @@ public class IntByteArrayMessageStore<M extends Writable>
 
   @Override
   public boolean hasMessagesForPartition(int partitionId) {
-    return !map.get(partitionId).isEmpty();
+    Int2ObjectOpenHashMap<DataInputOutput> partitionMessages =
+        map.get(partitionId);
+    return partitionMessages != null && !partitionMessages.isEmpty();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
 
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
index 3186224..280f5b9 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
@@ -139,7 +139,8 @@ public class IntFloatMessageStore
 
   @Override
   public boolean hasMessagesForPartition(int partitionId) {
-    return !map.get(partitionId).isEmpty();
+    Int2FloatOpenHashMap partitionMessages = map.get(partitionId);
+    return partitionMessages != null && !partitionMessages.isEmpty();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
 
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
index 6278c16..d8a3fde 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
@@ -140,7 +140,8 @@ public class LongDoubleMessageStore
 
   @Override
   public boolean hasMessagesForPartition(int partitionId) {
-    return !map.get(partitionId).isEmpty();
+    Long2DoubleOpenHashMap partitionMessages = map.get(partitionId);
+    return partitionMessages != null && !partitionMessages.isEmpty();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java
 
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java
index d8e5246..a0c977e 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java
@@ -102,7 +102,8 @@ public abstract class LongAbstractMessageStore<M extends 
Writable, T>
 
   @Override
   public boolean hasMessagesForPartition(int partitionId) {
-    return !map.get(partitionId).isEmpty();
+    Long2ObjectOpenHashMap<T> partitionMessages = map.get(partitionId);
+    return partitionMessages != null && !partitionMessages.isEmpty();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
index e9b072a..1cd1bd6 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
@@ -210,7 +210,7 @@ public class NettyWorkerClientRequestProcessor<I extends 
WritableComparable,
                                      Partition<I, V, E> partition) {
     final int partitionId = partition.getId();
     MessageStore<I, Writable> messageStore =
-        serverData.getPartitionStore().getCurrentMessageStore();
+        serverData.getCurrentMessageStore();
     ByteArrayVertexIdMessages<I, Writable> vertexIdMessages =
         new ByteArrayVertexIdMessages<I, Writable>(
             configuration.createOutgoingMessageValueFactory());

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
 
b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
index ab66aa3..b59d0cf 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
@@ -86,8 +86,8 @@ public class SendPartitionCurrentMessagesRequest<I extends 
WritableComparable,
   @Override
   public void doRequest(ServerData<I, V, E> serverData) {
     try {
-      serverData.getPartitionStore()
-          .addPartitionCurrentMessages(partitionId, vertexIdMessageMap);
+      serverData.<M>getCurrentMessageStore().addPartitionMessages(partitionId,
+          vertexIdMessageMap);
     } catch (IOException e) {
       throw new RuntimeException("doRequest: Got IOException ", e);
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java
 
b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java
index aeda197..00cf6ef 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java
@@ -69,7 +69,7 @@ public class SendWorkerEdgesRequest<I extends 
WritableComparable,
         iterator = partitionVertexData.getIterator();
     while (iterator.hasNext()) {
       iterator.next();
-      serverData.getPartitionStore()
+      serverData.getEdgeStore()
           .addPartitionEdges(iterator.getCurrentFirst(),
               iterator.getCurrentSecond());
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
 
b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
index e9be327..6953998 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
@@ -71,8 +71,8 @@ public class SendWorkerMessagesRequest<I extends 
WritableComparable,
     while (iterator.hasNext()) {
       iterator.next();
       try {
-        serverData.getPartitionStore().
-            addPartitionIncomingMessages(iterator.getCurrentFirst(),
+        serverData.getIncomingMessageStore().
+            addPartitionMessages(iterator.getCurrentFirst(),
                 iterator.getCurrentSecond());
       } catch (IOException e) {
         throw new RuntimeException("doRequest: Got IOException ", e);

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java
 
b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java
index aeb1b1d..f8d0473 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java
@@ -27,6 +27,7 @@ import java.util.Map.Entry;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.partition.PartitionOwner;
 import org.apache.giraph.utils.ByteArrayOneMessageToManyIds;
@@ -93,11 +94,10 @@ public class SendWorkerOneMessageToManyRequest<I extends 
WritableComparable,
   @Override
   public void doRequest(ServerData serverData) {
     try {
-      if (serverData.getPartitionStore().getIncomingMessageStore()
-          .isPointerListEncoding()) {
+      MessageStore<I, M> messageStore = serverData.getIncomingMessageStore();
+      if (messageStore.isPointerListEncoding()) {
         // if message store is pointer list based then send data as is
-        serverData.getPartitionStore()
-            .addPartitionIncomingMessages(-1, oneMessageToManyIds);
+        messageStore.addPartitionMessages(-1, oneMessageToManyIds);
       } else { // else split the data per partition and send individually
         CentralizedServiceWorker<I, ?, ?> serviceWorker =
             serverData.getServiceWorker();
@@ -144,9 +144,8 @@ public class SendWorkerOneMessageToManyRequest<I extends 
WritableComparable,
         for (Entry<Integer, ByteArrayVertexIdMessages> idMsgs :
             partitionIdMsgs.entrySet()) {
           if (!idMsgs.getValue().isEmpty()) {
-            serverData.getPartitionStore()
-                .addPartitionIncomingMessages(idMsgs.getKey(),
-                    idMsgs.getValue());
+            serverData.getIncomingMessageStore().addPartitionMessages(
+                idMsgs.getKey(), idMsgs.getValue());
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java 
b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 8ad3767..4787d37 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -71,8 +71,6 @@ import org.apache.giraph.mapping.translate.TranslateEdge;
 import org.apache.giraph.master.DefaultMasterCompute;
 import org.apache.giraph.master.MasterCompute;
 import org.apache.giraph.master.MasterObserver;
-import org.apache.giraph.ooc.JVMMemoryEstimator;
-import org.apache.giraph.ooc.MemoryEstimator;
 import org.apache.giraph.partition.GraphPartitionerFactory;
 import org.apache.giraph.partition.HashPartitionerFactory;
 import org.apache.giraph.partition.Partition;
@@ -976,15 +974,6 @@ public interface GiraphConstants {
       new BooleanConfOption("giraph.useOutOfCoreGraph", false,
           "Enable out-of-core graph.");
 
-  /**
-   * Memory estimator class used in adaptive out-of-core mechanism for deciding
-   * when data should go to disk.
-   */
-  ClassConfOption<MemoryEstimator> OUT_OF_CORE_MEM_ESTIMATOR =
-      ClassConfOption.create("giraph.outOfCoreMemoryEstimator",
-          JVMMemoryEstimator.class, MemoryEstimator.class,
-          "Memory estimator class used for out-of-core decisions");
-
   /** Number of threads participating in swapping graph/messages to disk. */
   IntConfOption NUM_OOC_THREADS =
       new IntConfOption("giraph.numOutOfCoreThreads", 1,

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java 
b/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
index 9609047..0f3d668 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
@@ -42,6 +42,8 @@ import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
 
+import static com.google.common.base.Preconditions.checkState;
+
 /**
  * Basic implementation of edges store, extended this to easily define simple
  * and primitive edge stores
@@ -157,7 +159,7 @@ public abstract class AbstractEdgeStore<I extends 
WritableComparable,
   getPartitionEdgesIterator(Map<K, OutEdges<I, E>> partitionEdges);
 
   @Override
-  public boolean hasPartitionEdges(int partitionId) {
+  public boolean hasEdgesForPartition(int partitionId) {
     return transientEdges.containsKey(partitionId);
   }
 
@@ -165,21 +167,21 @@ public abstract class AbstractEdgeStore<I extends 
WritableComparable,
   public void writePartitionEdgeStore(int partitionId, DataOutput output)
       throws IOException {
     Map<K, OutEdges<I, E>> edges = transientEdges.remove(partitionId);
-    output.writeInt(edges.size());
-    for (Map.Entry<K, OutEdges<I, E>> edge : edges.entrySet()) {
-      writeVertexKey(edge.getKey(), output);
-      edge.getValue().write(output);
+    if (edges != null) {
+      output.writeInt(edges.size());
+      for (Map.Entry<K, OutEdges<I, E>> edge : edges.entrySet()) {
+        writeVertexKey(edge.getKey(), output);
+        edge.getValue().write(output);
+      }
     }
   }
 
   @Override
   public void readPartitionEdgeStore(int partitionId, DataInput input)
       throws IOException {
-    if (transientEdges.containsKey(partitionId)) {
-      throw new IllegalStateException("readPartitionEdgeStore: reading a " +
-          "partition that is already there in the partition store " +
-          "(impossible)");
-    }
+    checkState(!transientEdges.containsKey(partitionId),
+        "readPartitionEdgeStore: reading a partition that is already there in" 
+
+            " the partition store (impossible)");
     Map<K, OutEdges<I, E>> partitionEdges = getPartitionEdges(partitionId);
     int numEntries = input.readInt();
     for (int i = 0; i < numEntries; ++i) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java 
b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
index 1c9d85f..f485042 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
@@ -46,21 +46,12 @@ public interface EdgeStore<I extends WritableComparable,
 
   /**
    * Move all edges from temporary storage to their source vertices.
-   * Note: this method is not thread-safe.
+   * Note: this method is not thread-safe and is called once all vertices and
+   * edges are read in INPUT_SUPERSTEP.
    */
   void moveEdgesToVertices();
 
   /**
-   * Whether the store contains edges for the given partition.
-   * Note: This method is thread-safe
-   *
-   * @param partitionId Partition id under query
-   * @return true if the store has any edge for the given partition, false
-   *         otherwise
-   */
-  boolean hasPartitionEdges(int partitionId);
-
-  /**
    * Deserialize the edges of a given partition, and removes the associated 
data
    * from the store.
    * Note: This method is not thread-safe (i.e. should not be called for the
@@ -84,4 +75,12 @@ public interface EdgeStore<I extends WritableComparable,
    */
   void readPartitionEdgeStore(int partitionId, DataInput input)
       throws IOException;
+
+  /**
+   * Check if edge store has edge for a given partition
+   *
+   * @param partitionId Id of partition
+   * @return True iff edge store have messages for the given partition
+   */
+  boolean hasEdgesForPartition(int partitionId);
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java 
b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 3c09957..62a87de 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -332,8 +332,7 @@ end[PURE_YARN]*/
       prepareForSuperstep(graphState);
       context.progress();
       MessageStore<I, Writable> messageStore =
-          serviceWorker.getServerData().getPartitionStore()
-              .getCurrentMessageStore();
+          serviceWorker.getServerData().getCurrentMessageStore();
       int numPartitions = serviceWorker.getPartitionStore().getNumPartitions();
       int numThreads = Math.min(numComputeThreads, numPartitions);
       if (LOG.isInfoEnabled()) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/AdaptiveOutOfCoreEngine.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/AdaptiveOutOfCoreEngine.java 
b/giraph-core/src/main/java/org/apache/giraph/ooc/AdaptiveOutOfCoreEngine.java
deleted file mode 100644
index d5b0e20..0000000
--- 
a/giraph-core/src/main/java/org/apache/giraph/ooc/AdaptiveOutOfCoreEngine.java
+++ /dev/null
@@ -1,284 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.ooc;
-
-import com.google.common.collect.Lists;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.utils.CallableFactory;
-import org.apache.giraph.utils.LogStacktraceCallable;
-import org.apache.giraph.utils.ThreadUtils;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.BrokenBarrierException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Adaptive out-of-core mechanism. This mechanism spawns two types of threads:
- *   1) check-memory thread, which periodically monitors the amount of 
available
- *      memory and decides whether data should go on disk. This threads is
- *      basically the brain behind the out-of-core mechanism, commands
- *      "out-of-core processor threads" (type 2 thread below) to move
- *      appropriate data to disk,
- *   2) out-of-core processor threads. This is a team of threads responsible 
for
- *      offloading appropriate data to disk. "check-memory thread" decides on
- *      which data should go to disk, and "out-of-core processor threads" do 
the
- *      offloading.
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge data
- */
-public class AdaptiveOutOfCoreEngine<I extends WritableComparable,
-    V extends Writable, E extends Writable> implements
-    OutOfCoreEngine<I, V, E> {
-  /** Class logger. */
-  private static final Logger LOG =
-      Logger.getLogger(AdaptiveOutOfCoreEngine.class);
-
-  // ---- Synchronization Variables ----
-  /** Barrier to coordinate check-memory and OOC-processing threads */
-  private final CyclicBarrier gate;
-  /**
-   * Signal to determine whether OOC processing threads are done processing OOC
-   * requests
-   */
-  private final CyclicBarrier doneOocSignal;
-  /** Signal to determine whether the computation is terminated */
-  private final CountDownLatch doneCompute;
-  /** Finisher signal to OOC processing threads */
-  private volatile boolean done;
-
-  // ---- OOC Commands ----
-  /**
-   * List of partitions that are on disk, and their loaded *vertices* during
-   * INPUT_SUPERSTEP are ready to be flushed to disk
-   */
-  private final BlockingQueue<Integer> partitionsWithInputVertices;
-  /**
-   * List of partitions that are on disk, and their loaded *edges* during
-   * INPUT_SUPERSTEP are ready to be flushed to disk
-   */
-  private final BlockingQueue<Integer> partitionsWithInputEdges;
-  /**
-   * List of partitions that are on disk, and their message buffers (either
-   * messages for current superstep, or incoming messages for next superstep)
-   * are ready to be flushed to disk
-   */
-  private final BlockingQueue<Integer> partitionsWithPendingMessages;
-  /** Number of partitions to be written to disk */
-  private final AtomicInteger numPartitionsToSpill;
-
-  /** Executor service for check memory thread */
-  private ExecutorService checkMemoryExecutor;
-  /** Executor service for out-of-core processor threads */
-  private ExecutorService outOfCoreProcessorExecutor;
-
-  /** Configuration */
-  private ImmutableClassesGiraphConfiguration<I, V, E> conf;
-  /** Worker */
-  private final CentralizedServiceWorker<I, V, E> serviceWorker;
-
-  /** Cached value for number of out-of-core threads specified by user */
-  private int numOocThreads;
-
-  /** Result of check-memory thread (to be checked for graceful termination) */
-  private Future<Void> checkMemoryResult;
-  /**
-   * Results of out-of-core processor threads (to be checked for graceful
-   * termination)
-   */
-  private List<Future<Void>> oocProcessorResults;
-
-  /**
-   * Creates an instance of adaptive mechanism
-   * @param conf Configuration
-   * @param serviceWorker Worker service
-   */
-  public AdaptiveOutOfCoreEngine(ImmutableClassesGiraphConfiguration conf,
-      CentralizedServiceWorker<I, V, E> serviceWorker) {
-    this.conf = conf;
-    this.serviceWorker = serviceWorker;
-
-    this.numOocThreads = conf.getNumOocThreads();
-    this.gate = new CyclicBarrier(numOocThreads + 1);
-    this.doneOocSignal = new CyclicBarrier(numOocThreads + 1);
-    this.doneCompute = new CountDownLatch(1);
-    this.done = false;
-    this.partitionsWithInputVertices = new ArrayBlockingQueue<Integer>(100);
-    this.partitionsWithInputEdges = new ArrayBlockingQueue<Integer>(100);
-    this.partitionsWithPendingMessages = new ArrayBlockingQueue<Integer>(100);
-    this.numPartitionsToSpill = new AtomicInteger(0);
-  }
-
-  @Override
-  public void initialize() {
-    if (LOG.isInfoEnabled()) {
-      LOG.info("initialize: initializing out-of-core engine");
-    }
-    CallableFactory<Void> checkMemoryCallableFactory =
-      new CallableFactory<Void>() {
-        @Override
-        public Callable<Void> newCallable(int callableId) {
-          return new CheckMemoryCallable<I, V, E>(
-              AdaptiveOutOfCoreEngine.this, conf, serviceWorker);
-        }
-      };
-    checkMemoryExecutor = Executors.newSingleThreadExecutor(
-        ThreadUtils.createThreadFactory("check-memory"));
-    checkMemoryResult = checkMemoryExecutor.submit(new LogStacktraceCallable<>(
-        checkMemoryCallableFactory.newCallable(0)));
-
-    CallableFactory<Void> outOfCoreProcessorCallableFactory =
-      new CallableFactory<Void>() {
-        @Override
-        public Callable<Void> newCallable(int callableId) {
-          return new OutOfCoreProcessorCallable<I, V, E>(
-              AdaptiveOutOfCoreEngine.this, serviceWorker);
-        }
-      };
-    outOfCoreProcessorExecutor =
-        Executors.newFixedThreadPool(numOocThreads,
-            ThreadUtils.createThreadFactory("ooc-%d"));
-    oocProcessorResults = Lists.newArrayListWithCapacity(numOocThreads);
-    for (int i = 0; i < numOocThreads; ++i) {
-      Future<Void> future = outOfCoreProcessorExecutor.submit(
-          new LogStacktraceCallable<>(
-              outOfCoreProcessorCallableFactory.newCallable(i)));
-      oocProcessorResults.add(future);
-    }
-  }
-
-  @Override
-  public void shutdown() {
-    doneCompute.countDown();
-    checkMemoryExecutor.shutdown();
-    if (checkMemoryResult.isCancelled()) {
-      throw new IllegalStateException(
-          "shutdown: memory check thread did not " + "terminate gracefully!");
-    }
-    outOfCoreProcessorExecutor.shutdown();
-    for (int i = 0; i < numOocThreads; ++i) {
-      if (oocProcessorResults.get(i).isCancelled()) {
-        throw new IllegalStateException("shutdown: out-of-core processor " +
-            "thread " + i + " did not terminate gracefully.");
-      }
-    }
-  }
-
-  /**
-   * @return the latch that signals whether the whole computation is done
-   */
-  public CountDownLatch getDoneCompute() {
-    return doneCompute;
-  }
-
-  /**
-   * @return whether the computation is done
-   */
-  public boolean isDone() {
-    return done;
-  }
-
-  /**
-   * @return list of partitions that have large enough buffers of vertices read
-   *         in INPUT_SUPERSTEP.
-   */
-  public BlockingQueue<Integer> getPartitionsWithInputVertices() {
-    return partitionsWithInputVertices;
-  }
-
-  /**
-   * @return list of partitions that have large enough buffers of edges read
-   *         in INPUT_SUPERSTEP.
-   */
-  public BlockingQueue<Integer> getPartitionsWithInputEdges() {
-    return partitionsWithInputEdges;
-  }
-
-  /**
-   * @return list of partitions that have large enough message buffers.
-   */
-  public BlockingQueue<Integer> getPartitionsWithPendingMessages() {
-    return partitionsWithPendingMessages;
-  }
-
-  /**
-   * @return number of partitions to spill to disk
-   */
-  public AtomicInteger getNumPartitionsToSpill() {
-    return numPartitionsToSpill;
-  }
-
-  /**
-   * Wait on gate with which OOC processor threads are notified to execute
-   * commands provided by brain (memory-check thread).
-   *
-   * @throws BrokenBarrierException
-   * @throws InterruptedException
-   */
-  public void waitOnGate() throws BrokenBarrierException, InterruptedException 
{
-    gate.await();
-  }
-
-  /**
-   * Reset the gate for reuse.
-   */
-  public void resetGate() {
-    gate.reset();
-  }
-
-  /**
-   * Wait on signal from all OOC processor threads that the offloading of data
-   * is complete.
-   *
-   * @throws BrokenBarrierException
-   * @throws InterruptedException
-   */
-  public void waitOnOocSignal()
-      throws BrokenBarrierException, InterruptedException {
-    doneOocSignal.await();
-  }
-
-  /**
-   * Reset the completion signal of OOC processor threads for reuse.
-   */
-  public void resetOocSignal() {
-    doneOocSignal.reset();
-  }
-
-  /**
-   * Set the computation as done (i.e. setting the state that determines the
-   * whole computation is done).
-   */
-  public void setDone() {
-    done = true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/CheckMemoryCallable.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/CheckMemoryCallable.java 
b/giraph-core/src/main/java/org/apache/giraph/ooc/CheckMemoryCallable.java
deleted file mode 100644
index 2b2c990..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/CheckMemoryCallable.java
+++ /dev/null
@@ -1,478 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.ooc;
-
-import org.apache.giraph.bsp.BspService;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.conf.FloatConfOption;
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.conf.IntConfOption;
-import org.apache.giraph.utils.PairList;
-import org.apache.giraph.utils.ReflectionUtils;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-import java.util.Stack;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.BrokenBarrierException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Adaptive out-of-core mechanism brain. This class provides one thread per
- * worker that periodically checks the free memory on the worker and compares 
it
- * with total amount of memory given to that worker to run the job. The period
- * at which the thread checks for the memory is specified by the user. Also,
- * user can specify the fraction of memory where anytime free memory is less
- * than that fraction of total memory, actions would be taken to free up space
- * in memory (this fraction is called LOW_FREE_MEMORY_FRACTION). Also, user can
- * specify another fraction of available memory where memory pressure is fair
- * and some of the data on disk (if there is any) can be brought back to memory
- * again (this fraction is called FAIR_FREE_MEMORY_FRACTION).
- *
- * In the adaptive out-of-core mechanism, if amount of free memory becomes less
- * than LOW_FREE_MEMORY_FRACTION, some data are being considered as potentials
- * to transfer to disk. These data can be in the following categories:
- *   1) Vertex buffers read in INPUT_SUPERSTEP. These are vertex input splits
- *      read for a partition that is out-of-core and PartitionStore holds these
- *      vertex buffers in in-memory buffers (and postpone their merge with the
- *      actual partition until the partition is loaded back in memory).
- *   2) Edge buffers read in INPUT_SUPERSTEP. These are similar buffers to
- *      vertex buffers, but they keep edge data in INPUT_SUPERSTEP.
- *   3) Partitions.
- *
- * This brain prefers the first two categories in INPUT_SUPERSTEP as long as
- * size of buffers are large enough that it is worth writing them to disk. In
- * case where brain decides on spilling partitions to disk, the brain decides
- * only on the "number of partitions" to spill to disk. It is "out-of-core
- * processor threads" responsibility to find that many partitions to spill to
- * disk. The number of partitions to spill is a fraction of number of 
partitions
- * currently in memory. It is recommended that this fraction be equal to
- * subtraction of LOW_FREE_MEMORY_FRACTION from FAIR_FREE_MEMORY_FRACTION. Here
- * is an example to clarify on this recommendation. Assume
- * LOW_FREE_MEMORY_FRACTION is 5% and FAIR_FREE_MEMORY_FRACTION is 15%. Also
- * assume that the partitions are similar in their memory footprint (which is a
- * valid assumption for most of the partitioning techniques). If free memory is
- * a bit less than 5% of total available memory, if we offload 10%
- * (15% - 5% = 10%), then the amount of free memory will increase to a bit less
- * than 15% of total available memory.
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- */
-public class CheckMemoryCallable<I extends WritableComparable,
-    V extends Writable, E extends Writable> implements Callable<Void> {
-  /**
-   * Lowest free memory fraction to start doing necessary actions to go
-   * out-of-core.
-   */
-  public static final FloatConfOption LOW_FREE_MEMORY_FRACTION =
-      new FloatConfOption("giraph.lowFreeMemoryFraction", 0.1f,
-          "If free memory fraction goes below this value, GC is called " +
-              "manually and necessary actions are taken if we have to go " +
-              "out-of-core");
-  /**
-   * Expected memory fraction to achieve after detecting that the job is 
running
-   * low in memory. Basically, this memory fraction is the target to achieve
-   * once we decide to offload data on disk.
-   */
-  public static final FloatConfOption MID_FREE_MEMORY_FRACTION =
-      new FloatConfOption("giraph.midFreeMemoryFraction", 0.15f,
-          "Once out-of-core mechanism decides to offload data on disk, it " +
-              "offloads data on disk until free memory fraction reaches this " 
+
-              "fraction.");
-  /**
-   * Memory fraction at which the job gets the best performance considering the
-   * choice of GC strategy. It means, if the amount of free memory is more than
-   * this fraction we will not see severe amount of GC calls.
-   */
-  public static final FloatConfOption FAIR_FREE_MEMORY_FRACTION =
-      new FloatConfOption("giraph.fairFreeMemoryFraction", 0.3f,
-          "The fraction of free memory at which the job shows the best GC " +
-              "performance. This fraction might be dependent on GC strategy " +
-              "used in running the job, but generally 0.3 is a reasonable " +
-              "fraction for most strategies.");
-  /**
-   * Memory fraction at which the job has enough space so we can back off from
-   * the last out-of-core decision, i.e. lazily bringing the last bunch of data
-   * spilled to disk.
-   */
-  public static final FloatConfOption HIGH_FREE_MEMORY_FRACTION =
-      new FloatConfOption("giraph.highFreeMemoryFraction", 0.4f,
-          "Once free memory reaches at this fraction, last out-of-core " +
-              "decision is lazily rolled back, i.e. we back off from " +
-              "out-of-core.");
-  /** Time interval at which checking memory is done periodically. */
-  public static final IntConfOption CHECK_MEMORY_INTERVAL =
-      new IntConfOption("giraph.checkMemoryInterval", 5000,
-          "Time interval (in milliseconds) at which checking memory is done" +
-              " to decide if there should be any out-of-core action.");
-  /** Coefficient by which the number of partitions in memory changes. */
-  public static final FloatConfOption OOC_GRAPH_MODIFICATION_COEFFICIENT =
-      new FloatConfOption("giraph.graphPartitionModificationCoefficient", 0.3f,
-          "If we decide to go out-of-core or back-off from out-of-core, this " 
+
-              "is the multiplier by which the number of in-memory partitions" +
-              "will change.");
-
-  /** Class logger */
-  private static final Logger LOG = 
Logger.getLogger(CheckMemoryCallable.class);
-
-  /** Worker */
-  private final CentralizedServiceWorker<I, V, E> serviceWorker;
-  /** Partition store */
-  private final DiskBackedPartitionStore<I, V, E> partitionStore;
-
-  // ---- Cached Config Values ----
-  /** Cached value of LOW_FREE_MEMORY_FRACTION */
-  private float lowFreeMemoryFraction;
-  /** Cached value for MID_FREE_MEMORY_FRACTION */
-  private float midFreeMemoryFraction;
-  /** Cached value of FAIR_FREE_MEMORY_FRACTION */
-  private float fairFreeMemoryFraction;
-  /** Cached value for HIGH_FREE_MEMORY_FRACTION */
-  private float highFreeMemoryFraction;
-  /** Cached value of CHECK_MEMORY_INTERVAL */
-  private int checkInterval;
-  /** Cached value for OOC_GRAPH_MODIFICATION_COEFFICIENT */
-  private float modificationCoefficient;
-
-  /** List of counts of number of partitions every time we shrink the store */
-  private Stack<Integer> oocPartitionCounts;
-  /** Memory estimator instance */
-  private final MemoryEstimator memoryEstimator;
-  /** Adaptive out-of-core engine */
-  private final AdaptiveOutOfCoreEngine<I, V, E> oocEngine;
-
-  /**
-   * Constructor for check-memory thread.
-   *
-   * @param oocEngine out-of-core engine
-   * @param conf job configuration
-   * @param serviceWorker worker service
-   */
-  public CheckMemoryCallable(AdaptiveOutOfCoreEngine<I, V, E> oocEngine,
-      ImmutableClassesGiraphConfiguration<I, V, E> conf,
-      CentralizedServiceWorker<I, V, E> serviceWorker) {
-    this.oocEngine = oocEngine;
-    this.serviceWorker = serviceWorker;
-    this.partitionStore =
-        (DiskBackedPartitionStore<I, V, E>) serviceWorker.getPartitionStore();
-
-    this.oocPartitionCounts = new Stack<>();
-
-    this.lowFreeMemoryFraction = LOW_FREE_MEMORY_FRACTION.get(conf);
-    this.midFreeMemoryFraction = MID_FREE_MEMORY_FRACTION.get(conf);
-    this.fairFreeMemoryFraction = FAIR_FREE_MEMORY_FRACTION.get(conf);
-    this.highFreeMemoryFraction = HIGH_FREE_MEMORY_FRACTION.get(conf);
-    this.checkInterval = CHECK_MEMORY_INTERVAL.get(conf);
-    this.modificationCoefficient = 
OOC_GRAPH_MODIFICATION_COEFFICIENT.get(conf);
-
-    memoryEstimator = ReflectionUtils
-        .newInstance(GiraphConstants.OUT_OF_CORE_MEM_ESTIMATOR.get(conf));
-  }
-
-  /**
-   * Checks whether the available free memory is enough for an efficient
-   * execution. If memory is limited, offload partitions to disk.
-   * Also, if available memory is more than a threshold, loads partitions from
-   * disk (if there is any) to memory.
-   */
-  @Override
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings("DM_GC")
-  public Void call() {
-    if (LOG.isInfoEnabled()) {
-      LOG.info("call: check-memory thread started.");
-    }
-    memoryEstimator.initialize(serviceWorker);
-    CountDownLatch doneCompute = oocEngine.getDoneCompute();
-    while (doneCompute.getCount() != 0) {
-      double maxMemory = memoryEstimator.maxMemoryMB();
-      double freeMemory = memoryEstimator.freeMemoryMB();
-      boolean gcDone = false;
-      if (freeMemory < lowFreeMemoryFraction * maxMemory) {
-        // This is typically a bad scenario where previous GCs were not
-        // successful to free up enough memory. If we keep staying in this
-        // situation, usually, either the computation slows down dramatically,
-        // or the computation throws OOM error. So, we do GC manually, and
-        // make sure that out-of-core is the solution to get out of this
-        // situation.
-        if (LOG.isInfoEnabled()) {
-          LOG.info("call: Memory is very limited now. Calling GC manually. " +
-              String.format("freeMemory = %.2fMB", freeMemory));
-        }
-        long gcStartTime = System.currentTimeMillis();
-        System.gc();
-        gcDone = true;
-        freeMemory = memoryEstimator.freeMemoryMB();
-        if (LOG.isInfoEnabled()) {
-          LOG.info("call: GC is done. " + String
-              .format("GC time = %.2f sec, and freeMemory = %.2fMB",
-                  (System.currentTimeMillis() - gcStartTime) / 1000.0,
-                  freeMemory));
-        }
-      }
-      // If we have enough memory, we roll back the latest shrink in number of
-      // partition slots.
-      // If we do not have enough memory, but we are not in a bad scenario
-      // either, we gradually increase the number of partition slots in memory.
-      // If we are low in free memory, we first push unnecessary data to disk
-      // and then push some partitions to disk if necessary.
-      int numInMemory = partitionStore.getNumPartitionInMemory();
-      int maxInMemory = partitionStore.getNumPartitionSlots();
-      int numInTotal = partitionStore.getNumPartitions();
-      if (freeMemory > highFreeMemoryFraction * maxMemory) {
-        if (numInMemory >= maxInMemory && !oocPartitionCounts.isEmpty()) {
-          partitionStore.increasePartitionSlots(oocPartitionCounts.pop());
-        }
-      } else if (freeMemory > fairFreeMemoryFraction * maxMemory) {
-        // Only gradually increase the number of partition slots if all slots
-        // are already used, and we have things out-of-core
-        if (!oocPartitionCounts.isEmpty() || maxInMemory < numInTotal) {
-          if (numInMemory >= maxInMemory) {
-            partitionStore.increasePartitionSlots(1);
-            if (!oocPartitionCounts.isEmpty()) {
-              int num = oocPartitionCounts.pop();
-              if (num > 1) {
-                oocPartitionCounts.push(num - 1);
-              }
-            }
-          }
-        }
-      } else if (gcDone && freeMemory < midFreeMemoryFraction * maxMemory) {
-        BlockingQueue<Integer> partitionsWithInputVertices =
-            oocEngine.getPartitionsWithInputVertices();
-        BlockingQueue<Integer> partitionsWithInputEdges =
-            oocEngine.getPartitionsWithInputEdges();
-        BlockingQueue<Integer> partitionsWithPendingMessages =
-            oocEngine.getPartitionsWithPendingMessages();
-        AtomicInteger numPartitionsToSpill =
-            oocEngine.getNumPartitionsToSpill();
-        while (freeMemory < midFreeMemoryFraction * maxMemory) {
-          // Offload input vertex buffer of OOC partitions if we are in
-          // INPUT_SUPERSTEP
-          if (serviceWorker.getSuperstep() == BspService.INPUT_SUPERSTEP) {
-            // List of pairs (partitionId, approximate memory footprint of
-            // vertex buffers of that partition).
-            PairList<Integer, Integer> pairs =
-                partitionStore.getOocPartitionIdsWithPendingInputVertices();
-            freeMemory -= createCommands(pairs, partitionsWithInputVertices);
-          }
-
-          // Offload edge store of OOC partitions if we are in INPUT_SUPERSTEP
-          if (freeMemory < midFreeMemoryFraction * maxMemory &&
-              serviceWorker.getSuperstep() == BspService.INPUT_SUPERSTEP) {
-            PairList<Integer, Integer> pairs =
-                partitionStore.getOocPartitionIdsWithPendingInputEdges();
-            freeMemory -= createCommands(pairs, partitionsWithInputEdges);
-          }
-
-          // Offload message buffers of OOC partitions if we are still low in
-          // free memory
-          if (freeMemory < midFreeMemoryFraction * maxMemory) {
-            PairList<Integer, Integer> pairs =
-                partitionStore.getOocPartitionIdsWithPendingMessages();
-            freeMemory -= createCommands(pairs, partitionsWithPendingMessages);
-          }
-
-          // Offload partitions if we are still low in free memory
-          if (freeMemory < midFreeMemoryFraction * maxMemory) {
-            numPartitionsToSpill
-                .set(getNextOocPartitionCount(freeMemory, maxMemory));
-          }
-
-          if (!partitionsWithInputVertices.isEmpty() ||
-              !partitionsWithInputEdges.isEmpty() ||
-              !partitionsWithPendingMessages.isEmpty() ||
-              numPartitionsToSpill.get() != 0) {
-            if (LOG.isInfoEnabled()) {
-              LOG.info("call: signal out-of-core processor threads to start " +
-                  "offloading. These threads will spill vertex buffers of " +
-                  partitionsWithInputVertices.size() + " partitions, edge " +
-                  "buffers of " + partitionsWithInputEdges.size() +
-                  " partitions, pending message buffers of " +
-                  partitionsWithPendingMessages.size() + " partitions, and " +
-                  numPartitionsToSpill.get() + " whole partitions");
-            }
-            // Opening the gate for OOC processing threads to start spilling
-            // data on disk
-            try {
-              oocEngine.waitOnGate();
-            } catch (InterruptedException e) {
-              throw new IllegalStateException("call: Caught " +
-                  "InterruptedException while opening the gate for OOC " +
-                  "processing threads");
-            } catch (BrokenBarrierException e) {
-              throw new IllegalStateException("call: Caught " +
-                  "BrokenBarrierException while opening the gate for OOC " +
-                  "processing threads");
-            }
-            oocEngine.resetGate();
-
-            if (LOG.isInfoEnabled()) {
-              LOG.info("call: waiting on OOC processors to finish offloading " 
+
-                  "data to disk");
-            }
-            // Wait until all OOC processing threads are done swapping data to
-            // disk
-            try {
-              oocEngine.waitOnOocSignal();
-            } catch (InterruptedException e) {
-              throw new IllegalStateException("call: Caught " +
-                  "InterruptedException. Looks like memory check thread is " +
-                  "interrupted while waiting on OOC processing threads.");
-            } catch (BrokenBarrierException e) {
-              throw new IllegalStateException("call: Caught " +
-                  "BrokenBarrierException. Looks like some OOC processing " +
-                  "threads  broke while writing data on disk.");
-            }
-            oocEngine.resetOocSignal();
-          }
-
-          gcDone = false;
-          long gcStartTime = 0;
-          if (freeMemory < midFreeMemoryFraction * maxMemory) {
-            // Calling GC manually to actually free up the memory for data that
-            // is offloaded to disk
-            if (LOG.isInfoEnabled()) {
-              LOG.info("call: calling GC manually to free up space for " +
-                  "recently offloaded data.");
-            }
-            gcStartTime = System.currentTimeMillis();
-            System.gc();
-            gcDone = true;
-          }
-          freeMemory = memoryEstimator.freeMemoryMB();
-          if (LOG.isInfoEnabled()) {
-            LOG.info("call: " +
-                (gcDone ?
-                    ("GC is done. " + String.format("GC time = %.2f sec. ",
-                        (System.currentTimeMillis() - gcStartTime) / 1000.0)) :
-                    "") +
-                "Finished offloading data to disk. " +
-                String.format("freeMemory = %.2fMB", freeMemory));
-          }
-        }
-      }
-
-      // Either wait for the computation to be done, or the time interval 
passes
-      try {
-        doneCompute.await(checkInterval, TimeUnit.MILLISECONDS);
-      } catch (InterruptedException e) {
-        throw new IllegalStateException("call: Caught InterruptedException " +
-            "while waiting for computation to be done and/or " + checkInterval 
+
-            "milliseconds passes.");
-      }
-    }
-
-    // Setting 'done' before the gate here and checking 'done' in OOC 
processing
-    // threads after the gate, guarantees that OOC processing threads see the
-    // new value of done and terminate gracefully.
-    oocEngine.setDone();
-    try {
-      oocEngine.waitOnGate();
-    } catch (InterruptedException e) {
-      throw new IllegalStateException("call: Caught InterruptedException " +
-          "while waiting for the last time on gate in the current superstep");
-    } catch (BrokenBarrierException e) {
-      throw new IllegalStateException("call: Caught BrokenBarrierException " +
-          "while waiting for the last time on gate in the current superstep");
-    }
-    return null;
-  }
-
-  /**
-   * Returns the number of partitions that should go out-of-core at this point.
-   *
-   * @return number of partitions that should go out-of-core
-   * @param freeMemory amount of free memory (in MB)
-   * @param maxMemory amount of max memory (in MB)
-   */
-  private int getNextOocPartitionCount(double freeMemory, double maxMemory) {
-    int numSlots = partitionStore.getNumPartitionSlots();
-    if (numSlots == Integer.MAX_VALUE) {
-      numSlots = partitionStore.getNumPartitions();
-      partitionStore.setNumPartitionSlots(numSlots);
-    }
-
-    double freeFraction = freeMemory / maxMemory;
-    double multiplier = Math.min(
-        // User-specified favorable size to spill to disk
-        modificationCoefficient,
-        // Approximate fraction of current data to spill in order to reach the
-        // fair fraction of free memory
-        (fairFreeMemoryFraction - freeFraction) / (1 - freeFraction));
-    int count = Math.max((int) (numSlots * multiplier), 1);
-    if (count >= numSlots) {
-      LOG.warn("getNextOocPartitionCount: Memory capacity is " +
-          numSlots + " partitions, and OOC mechanism is " +
-          "trying to put " + count + " partitions to disk. This is not " +
-          "possible");
-      // We should have at least one partition in memory
-      count = numSlots - 1;
-      if (count == 0) {
-        LOG.warn("It seems that size of one partition is too large for the " +
-            "available memory.  Try to run the job with more partitions!");
-      }
-    }
-    if (count != 0) {
-      oocPartitionCounts.push(count);
-    }
-    return count;
-  }
-
-  /**
-   * Generate commands for out-of-core processor threads based on the
-   * (partitionId, memory foot-print) pairs we have on a particular type of 
data
-   * (either vertex buffer, edge buffer, or message buffer).
-   *
-   * @param pairs list of pairs (partitionId, estimated memory foot-print that
-   *              is going to be reduced by offloading the particular data of a
-   *              partition)
-   * @param commands commands to generate for out-of-core processor threads. a
-   *                 command is a partition id, for which the appropriate data
-   *                 should be flushed to disk.
-   * @return approximate amount of memory (in MB) that is going to be freed up
-   *         after executing the generated commands
-   */
-  private double createCommands(PairList<Integer, Integer> pairs,
-      BlockingQueue<Integer> commands) {
-    double usedMemory = 0;
-    if (pairs.getSize() != 0) {
-      PairList<Integer, Integer>.Iterator iterator = pairs.getIterator();
-      // Generating commands for out-of-core processor threads to
-      // offload data as long as command queue has space.
-      while (iterator.hasNext() &&
-          commands.remainingCapacity() > 0) {
-        iterator.next();
-        commands.add(iterator.getCurrentFirst());
-        // Having an approximation on the memory foot-print of data to offload
-        // helps us to know how much memory is going to become available by
-        // offloading the data without using internal functions to estimate
-        // free memory again.
-        usedMemory += iterator.getCurrentSecond() / 1024.0 / 1024.0;
-      }
-    }
-    return usedMemory;
-  }
-}

Reply via email to