[GIRAPH-1022] Adaptive out-of-core mechanism for input superstep and graph

Summary: This code adds the ability to adaptively control the out-of-core 
mechanism for graph data structure at run-time during input/output superstep 
and computation superstep. Basically, the implemented mechanism monitors the 
amount of available free memory in a separate thread. If there is not enough 
memory, the code adjusts the number of partitions in memory, and spills a 
series of partitions/buffers to disk. Also, if the amount of free memory is 
more than expected, some of the on-disk partitions are brought back to memory. 
Additionally, if amount of free memory is marginal, the mechanism mocks the 
memory usage by gradually bringing partitions to memory.

Test Plan:
mvn clean verify
Unit tests added to giraph-core
End-to-end test added to giraph-example
Running the code on PageRank on a large graph and not getting OOM failures.

Reviewers: maja.kabiljo, sergey.edunov, avery.ching, dionysis.logothetis

Reviewed By: dionysis.logothetis

Differential Revision: https://reviews.facebook.net/D40563


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/03ade425
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/03ade425
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/03ade425

Branch: refs/heads/trunk
Commit: 03ade425dd5a65d3a713d5e7d85aa7605956fbd2
Parents: 1ca3222
Author: Hassan Eslami <[email protected]>
Authored: Mon Jul 27 11:59:21 2015 -0700
Committer: Avery Ching <[email protected]>
Committed: Mon Jul 27 12:20:38 2015 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |    2 +
 findbugs-exclude.xml                            |    8 -
 .../giraph/bsp/CentralizedServiceWorker.java    |    7 +
 .../java/org/apache/giraph/comm/ServerData.java |   15 +-
 .../long_id/LongAbstractListMessageStore.java   |   18 +-
 .../comm/requests/SendWorkerEdgesRequest.java   |    4 +-
 .../requests/SendWorkerVerticesRequest.java     |   16 +-
 .../apache/giraph/conf/GiraphConfiguration.java |    4 +
 .../org/apache/giraph/conf/GiraphConstants.java |   36 +-
 .../apache/giraph/edge/AbstractEdgeStore.java   |   74 +-
 .../java/org/apache/giraph/edge/EdgeStore.java  |   39 +
 .../org/apache/giraph/edge/SimpleEdgeStore.java |   15 +
 .../giraph/edge/primitives/IntEdgeStore.java    |   15 +
 .../giraph/edge/primitives/LongEdgeStore.java   |   14 +
 .../apache/giraph/graph/ComputeCallable.java    |   36 +-
 .../apache/giraph/graph/GraphTaskManager.java   |   13 +-
 .../giraph/ooc/AdaptiveOutOfCoreEngine.java     |  270 +++
 .../apache/giraph/ooc/CheckMemoryCallable.java  |  466 +++++
 .../giraph/ooc/DiskBackedPartitionStore.java    | 1769 ++++++++++++++++++
 .../apache/giraph/ooc/JVMMemoryEstimator.java   |   45 +
 .../org/apache/giraph/ooc/MemoryEstimator.java  |   44 +
 .../org/apache/giraph/ooc/OutOfCoreEngine.java  |   43 +
 .../giraph/ooc/OutOfCoreProcessorCallable.java  |  145 ++
 .../org/apache/giraph/ooc/package-info.java     |   21 +
 .../partition/DiskBackedPartitionStore.java     | 1300 -------------
 .../apache/giraph/partition/PartitionStore.java |  132 +-
 .../giraph/partition/SimplePartitionStore.java  |  117 +-
 .../apache/giraph/worker/BspServiceWorker.java  |   61 +-
 .../org/apache/giraph/comm/RequestTest.java     |    2 +-
 .../TestIntFloatPrimitiveMessageStores.java     |    4 +-
 .../TestLongDoublePrimitiveMessageStores.java   |    4 +-
 .../giraph/partition/TestPartitionStores.java   |  250 +--
 .../java/org/apache/giraph/TestOutOfCore.java   |  121 ++
 33 files changed, 3492 insertions(+), 1618 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index fd31545..c844f61 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -2,6 +2,8 @@ Giraph Change Log
 
 Release 1.2.0 - unreleased
 =======
+  GIRAPH-1022: Out-of-core mechanism for input superstep and graph data 
(heslami via aching)
+
   GIRAPH-1021: Missing progress report for graph mutations. (heslami via 
aching)
 
   GIRAPH-1020: TaskInfo equality condition. (heslami via aching)

http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/findbugs-exclude.xml b/findbugs-exclude.xml
index afdf041..0ab2c73 100644
--- a/findbugs-exclude.xml
+++ b/findbugs-exclude.xml
@@ -89,14 +89,6 @@
     <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE"/>
   </Match>
   <Match>
-    <Class 
name="org.apache.giraph.partition.DiskBackedPartitionStore$AddPartition"/>
-    <Bug pattern="UL_UNRELEASED_LOCK"/>
-  </Match>
-  <Match>
-    <Class 
name="org.apache.giraph.partition.DiskBackedPartitionStore$GetPartition"/>
-    <Bug pattern="UL_UNRELEASED_LOCK"/>
-  </Match>
-  <Match>
     <Class name="~org.apache.giraph.function.primitive.PrimitiveRefs.*Ref"/>
     <Bug pattern="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD"/>
   </Match>

http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java 
b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
index 37aed45..f6d77d0 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
@@ -245,4 +245,11 @@ public interface CentralizedServiceWorker<I extends 
WritableComparable,
    * previous superstep.
    */
   GlobalStats getGlobalStats();
+
+  /**
+   * Get the number of partitions owned by this worker
+   *
+   * @return number of partitions owned
+   */
+  int getNumPartitionsOwned();
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java 
b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index 8269998..eddfbc6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -36,12 +36,10 @@ import org.apache.giraph.comm.messages.MessageStoreFactory;
 import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.edge.EdgeStore;
-import org.apache.giraph.edge.EdgeStoreFactory;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexMutations;
 import org.apache.giraph.graph.VertexResolver;
-import org.apache.giraph.partition.DiskBackedPartitionStore;
+import org.apache.giraph.ooc.DiskBackedPartitionStore;
 import org.apache.giraph.partition.Partition;
 import org.apache.giraph.partition.PartitionStore;
 import org.apache.giraph.partition.SimplePartitionStore;
@@ -67,8 +65,6 @@ public class ServerData<I extends WritableComparable,
   private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
   /** Partition store for this worker. */
   private volatile PartitionStore<I, V, E> partitionStore;
-  /** Edge store for this worker. */
-  private final EdgeStore<I, V, E> edgeStore;
   /** Message store factory */
   private final MessageStoreFactory<I, Writable, MessageStore<I, Writable>>
   messageStoreFactory;
@@ -144,20 +140,13 @@ public class ServerData<I extends WritableComparable,
               getServiceWorker());
     } else {
       partitionStore =
-          new SimplePartitionStore<I, V, E>(conf, context);
+          new SimplePartitionStore<I, V, E>(conf, context, getServiceWorker());
     }
-    EdgeStoreFactory<I, V, E> edgeStoreFactory = conf.createEdgeStoreFactory();
-    edgeStoreFactory.initialize(service, conf, context);
-    edgeStore = edgeStoreFactory.newStore();
     ownerAggregatorData = new OwnerAggregatorServerData(context);
     allAggregatorData = new AllAggregatorServerData(context, conf);
     this.context = context;
   }
 
-  public EdgeStore<I, V, E> getEdgeStore() {
-    return edgeStore;
-  }
-
   /**
    * Return the partition store for this worker.
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListMessageStore.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListMessageStore.java
 
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListMessageStore.java
index ae61de4..d1c33be 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListMessageStore.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListMessageStore.java
@@ -78,13 +78,19 @@ public abstract class LongAbstractListMessageStore<M 
extends Writable,
    */
   private void populateMap() { // TODO - can parallelize?
     // populate with vertex ids already known
-    for (int partitionId : service.getPartitionStore().getPartitionIds()) {
-      Partition<LongWritable, ?, ?> partition = service.getPartitionStore()
-          .getOrCreatePartition(partitionId);
-      Long2ObjectOpenHashMap<L> partitionMap = map.get(partitionId);
-      for (Vertex<LongWritable, ?, ?> vertex : partition) {
-        partitionMap.put(vertex.getId().get(), createList());
+    service.getPartitionStore().startIteration();
+    while (true) {
+      Partition partition = service.getPartitionStore().getNextPartition();
+      if (partition == null) {
+        break;
+      }
+      Long2ObjectOpenHashMap<L> partitionMap = map.get(partition.getId());
+      for (Object obj : partition) {
+        Vertex vertex = (Vertex) obj;
+        LongWritable vertexId = (LongWritable) vertex.getId();
+        partitionMap.put(vertexId.get(), createList());
       }
+      service.getPartitionStore().putPartition(partition);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java
 
b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java
index 510743f..aeda197 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerEdgesRequest.java
@@ -69,8 +69,8 @@ public class SendWorkerEdgesRequest<I extends 
WritableComparable,
         iterator = partitionVertexData.getIterator();
     while (iterator.hasNext()) {
       iterator.next();
-      serverData.getEdgeStore().
-          addPartitionEdges(iterator.getCurrentFirst(),
+      serverData.getPartitionStore()
+          .addPartitionEdges(iterator.getCurrentFirst(),
               iterator.getCurrentSecond());
     }
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerVerticesRequest.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerVerticesRequest.java
 
b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerVerticesRequest.java
index fb93dae..fde6e7f 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerVerticesRequest.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerVerticesRequest.java
@@ -22,13 +22,11 @@ import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.utils.ExtendedDataOutput;
 import org.apache.giraph.utils.PairList;
-import org.apache.giraph.utils.VertexIterator;
 import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.giraph.partition.PartitionStore;
-import org.apache.giraph.partition.Partition;
 import org.apache.log4j.Logger;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -105,15 +103,9 @@ public class SendWorkerVerticesRequest<I extends 
WritableComparable,
         iterator = workerPartitions.getIterator();
     while (iterator.hasNext()) {
       iterator.next();
-      VertexIterator<I, V, E> vertexIterator =
-          new VertexIterator<I, V, E>(
-          iterator.getCurrentSecond(), getConf());
-
-      Partition<I, V, E> partition;
-      PartitionStore store = serverData.getPartitionStore();
-      partition = store.getOrCreatePartition(iterator.getCurrentFirst());
-      partition.addPartitionVertices(vertexIterator);
-      store.putPartition(partition);
+      serverData.getPartitionStore()
+          .addPartitionVertices(iterator.getCurrentFirst(),
+              iterator.getCurrentSecond());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java 
b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index a395244..e6931de 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -936,6 +936,10 @@ public class GiraphConfiguration extends Configuration
     return NUM_COMPUTE_THREADS.get(this);
   }
 
+  public int getNumOocThreads() {
+    return NUM_OOC_THREADS.get(this);
+  }
+
   /**
    * Set the number of input split threads
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java 
b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 72d913d..2804192 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -71,6 +71,8 @@ import org.apache.giraph.mapping.translate.TranslateEdge;
 import org.apache.giraph.master.DefaultMasterCompute;
 import org.apache.giraph.master.MasterCompute;
 import org.apache.giraph.master.MasterObserver;
+import org.apache.giraph.ooc.JVMMemoryEstimator;
+import org.apache.giraph.ooc.MemoryEstimator;
 import org.apache.giraph.partition.GraphPartitionerFactory;
 import org.apache.giraph.partition.HashPartitionerFactory;
 import org.apache.giraph.partition.Partition;
@@ -89,6 +91,8 @@ import org.apache.hadoop.mapreduce.OutputFormat;
 public interface GiraphConstants {
   /** 1KB in bytes */
   int ONE_KB = 1024;
+  /** 1MB in bytes */
+  int ONE_MB = 1024 * 1024;
 
   /** Mapping related information */
   ClassConfOption<MappingStore> MAPPING_STORE_CLASS =
@@ -967,20 +971,32 @@ public interface GiraphConstants {
       new BooleanConfOption("giraph.useOutOfCoreGraph", false,
           "Enable out-of-core graph.");
 
-  /** Directory to write YourKit snapshots to */
-  String YOURKIT_OUTPUT_DIR = "giraph.yourkit.outputDir";
-  /** Default directory to write YourKit snapshots to */
-  String YOURKIT_OUTPUT_DIR_DEFAULT = "/tmp/giraph/%JOB_ID%/%TASK_ID%";
+  /**
+   * Memory estimator class used in adaptive out-of-core mechanism for deciding
+   * when data should go to disk.
+   */
+  ClassConfOption<MemoryEstimator> OUT_OF_CORE_MEM_ESTIMATOR =
+      ClassConfOption.create("giraph.outOfCoreMemoryEstimator",
+          JVMMemoryEstimator.class, MemoryEstimator.class,
+          "Memory estimator class used for out-of-core decisions");
+
+  /** Number of threads participating in swapping graph/messages to disk. */
+  IntConfOption NUM_OOC_THREADS =
+      new IntConfOption("giraph.numOutOfCoreThreads", 1,
+          "Number of threads participating in swapping data to disk.");
 
   /** Maximum number of partitions to hold in memory for each worker. */
   IntConfOption MAX_PARTITIONS_IN_MEMORY =
-      new IntConfOption("giraph.maxPartitionsInMemory", 10,
-          "Maximum number of partitions to hold in memory for each worker.");
+      new IntConfOption("giraph.maxPartitionsInMemory", 0,
+          "Maximum number of partitions to hold in memory for each worker. By" 
+
+              " default it is set to 0 (for adaptive out-of-core mechanism");
 
-  /** Set number of sticky partitions if sticky mode is enabled.  */
-  IntConfOption MAX_STICKY_PARTITIONS =
-      new IntConfOption("giraph.stickyPartitions", 0,
-          "Set number of sticky partitions if sticky mode is enabled.");
+
+
+  /** Directory to write YourKit snapshots to */
+  String YOURKIT_OUTPUT_DIR = "giraph.yourkit.outputDir";
+  /** Default directory to write YourKit snapshots to */
+  String YOURKIT_OUTPUT_DIR_DEFAULT = "/tmp/giraph/%JOB_ID%/%TASK_ID%";
 
   /** Keep the zookeeper output for debugging? Default is to remove it. */
   BooleanConfOption KEEP_ZOOKEEPER_DATA =

http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java 
b/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
index 5d15707..9609047 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
@@ -34,10 +34,11 @@ import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.Progressable;
 import org.apache.log4j.Logger;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
 
@@ -130,6 +131,23 @@ public abstract class AbstractEdgeStore<I extends 
WritableComparable,
   protected abstract OutEdges<I, E> getPartitionEdges(Et entry);
 
   /**
+   * Writes the given key to the output
+   *
+   * @param key input key to be written
+   * @param output output to write the key to
+   */
+  protected abstract void writeVertexKey(K key, DataOutput output)
+  throws IOException;
+
+  /**
+   * Reads the given key from the input
+   *
+   * @param input input to read the key from
+   * @return Key read from the input
+   */
+  protected abstract K readVertexKey(DataInput input) throws IOException;
+
+  /**
    * Get iterator for partition edges
    *
    * @param partitionEdges map of out-edges for vertices in a partition
@@ -138,6 +156,40 @@ public abstract class AbstractEdgeStore<I extends 
WritableComparable,
   protected abstract Iterator<Et>
   getPartitionEdgesIterator(Map<K, OutEdges<I, E>> partitionEdges);
 
+  @Override
+  public boolean hasPartitionEdges(int partitionId) {
+    return transientEdges.containsKey(partitionId);
+  }
+
+  @Override
+  public void writePartitionEdgeStore(int partitionId, DataOutput output)
+      throws IOException {
+    Map<K, OutEdges<I, E>> edges = transientEdges.remove(partitionId);
+    output.writeInt(edges.size());
+    for (Map.Entry<K, OutEdges<I, E>> edge : edges.entrySet()) {
+      writeVertexKey(edge.getKey(), output);
+      edge.getValue().write(output);
+    }
+  }
+
+  @Override
+  public void readPartitionEdgeStore(int partitionId, DataInput input)
+      throws IOException {
+    if (transientEdges.containsKey(partitionId)) {
+      throw new IllegalStateException("readPartitionEdgeStore: reading a " +
+          "partition that is already there in the partition store " +
+          "(impossible)");
+    }
+    Map<K, OutEdges<I, E>> partitionEdges = getPartitionEdges(partitionId);
+    int numEntries = input.readInt();
+    for (int i = 0; i < numEntries; ++i) {
+      K vertexKey = readVertexKey(input);
+      OutEdges<I, E> edges = configuration.createAndInitializeInputOutEdges();
+      edges.readFields(input);
+      partitionEdges.put(vertexKey, edges);
+    }
+  }
+
   /**
    * Get out-edges for a given vertex
    *
@@ -199,9 +251,7 @@ public abstract class AbstractEdgeStore<I extends 
WritableComparable,
       LOG.info("moveEdgesToVertices: Moving incoming edges to vertices.");
     }
 
-    final BlockingQueue<Integer> partitionIdQueue =
-        new ArrayBlockingQueue<>(transientEdges.size());
-    partitionIdQueue.addAll(transientEdges.keySet());
+    service.getPartitionStore().startIteration();
     int numThreads = configuration.getNumInputSplitsThreads();
 
     CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
@@ -212,11 +262,19 @@ public abstract class AbstractEdgeStore<I extends 
WritableComparable,
           public Void call() throws Exception {
             Integer partitionId;
             I representativeVertexId = configuration.createVertexId();
-            while ((partitionId = partitionIdQueue.poll()) != null) {
+            while (true) {
               Partition<I, V, E> partition =
-                  
service.getPartitionStore().getOrCreatePartition(partitionId);
+                  service.getPartitionStore().getNextPartition();
+              if (partition == null) {
+                break;
+              }
               Map<K, OutEdges<I, E>> partitionEdges =
-                  transientEdges.remove(partitionId);
+                  transientEdges.remove(partition.getId());
+              if (partitionEdges == null) {
+                service.getPartitionStore().putPartition(partition);
+                continue;
+              }
+
               Iterator<Et> iterator =
                   getPartitionEdgesIterator(partitionEdges);
               // process all vertices in given partition

http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java 
b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
index 912e25c..1c9d85f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
@@ -22,6 +22,10 @@ import org.apache.giraph.utils.VertexIdEdges;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
 /**
  * Collects incoming edges for vertices owned by this worker.
  *
@@ -45,4 +49,39 @@ public interface EdgeStore<I extends WritableComparable,
    * Note: this method is not thread-safe.
    */
   void moveEdgesToVertices();
+
+  /**
+   * Whether the store contains edges for the given partition.
+   * Note: This method is thread-safe
+   *
+   * @param partitionId Partition id under query
+   * @return true if the store has any edge for the given partition, false
+   *         otherwise
+   */
+  boolean hasPartitionEdges(int partitionId);
+
+  /**
+   * Deserialize the edges of a given partition, and removes the associated 
data
+   * from the store.
+   * Note: This method is not thread-safe (i.e. should not be called for the
+   * same partition at the same time).
+   *
+   * @param partitionId Id of partition to deserialize
+   * @param output Output to write the edge store to
+   */
+  void writePartitionEdgeStore(int partitionId, DataOutput output)
+      throws IOException;
+
+  /**
+   * Serialize the edges of a given partition, and adds it to the partition
+   * store (assumes that the edge store does not have any edge from the
+   * partition already).
+   * Note: This method is not thread-safe (i.e. should not be called for the
+   * same partition at the same time).
+   *
+   * @param partitionId Id of partition to serialize
+   * @param input Input to read the partition from
+   */
+  void readPartitionEdgeStore(int partitionId, DataInput input)
+      throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/edge/SimpleEdgeStore.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/edge/SimpleEdgeStore.java 
b/giraph-core/src/main/java/org/apache/giraph/edge/SimpleEdgeStore.java
index 3eb97d6..19ddc07 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/SimpleEdgeStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/SimpleEdgeStore.java
@@ -27,6 +27,9 @@ import org.apache.hadoop.util.Progressable;
 
 import com.google.common.collect.MapMaker;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
@@ -93,6 +96,18 @@ public class SimpleEdgeStore<I extends WritableComparable,
   }
 
   @Override
+  protected void writeVertexKey(I key, DataOutput output) throws IOException {
+    key.write(output);
+  }
+
+  @Override
+  protected I readVertexKey(DataInput input) throws IOException {
+    I id = configuration.createVertexId();
+    id.readFields(input);
+    return id;
+  }
+
+  @Override
   protected Iterator<Map.Entry<I, OutEdges<I, E>>>
   getPartitionEdgesIterator(Map<I, OutEdges<I, E>> partitionEdges) {
     return partitionEdges.entrySet().iterator();

http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/edge/primitives/IntEdgeStore.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/edge/primitives/IntEdgeStore.java 
b/giraph-core/src/main/java/org/apache/giraph/edge/primitives/IntEdgeStore.java
index b138f49..253c68c 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/edge/primitives/IntEdgeStore.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/edge/primitives/IntEdgeStore.java
@@ -31,6 +31,9 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
 
@@ -81,6 +84,18 @@ public class IntEdgeStore<V extends Writable, E extends 
Writable>
   }
 
   @Override
+  protected void writeVertexKey(Integer key, DataOutput output)
+      throws IOException {
+    output.writeInt(key);
+  }
+
+  @Override
+  protected Integer readVertexKey(DataInput input)
+      throws IOException {
+    return input.readInt();
+  }
+
+  @Override
   protected Iterator<Int2ObjectMap.Entry<OutEdges<IntWritable, E>>>
   getPartitionEdgesIterator(
     Map<Integer, OutEdges<IntWritable, E>> partitionEdges) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/edge/primitives/LongEdgeStore.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/edge/primitives/LongEdgeStore.java
 
b/giraph-core/src/main/java/org/apache/giraph/edge/primitives/LongEdgeStore.java
index 61f908a..db3ebe5 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/edge/primitives/LongEdgeStore.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/edge/primitives/LongEdgeStore.java
@@ -31,6 +31,9 @@ import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
 import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
 import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
 
@@ -82,6 +85,17 @@ public class LongEdgeStore<V extends Writable, E extends 
Writable>
   }
 
   @Override
+  protected void writeVertexKey(Long key, DataOutput output)
+      throws IOException {
+    output.writeLong(key);
+  }
+
+  @Override
+  protected Long readVertexKey(DataInput input) throws IOException {
+    return input.readLong();
+  }
+
+  @Override
   protected Iterator<Long2ObjectMap.Entry<OutEdges<LongWritable, E>>>
   getPartitionEdgesIterator(
       Map<Long, OutEdges<LongWritable, E>> partitionEdges) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java 
b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index e44a794..923e427 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -20,7 +20,6 @@ package org.apache.giraph.graph;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
@@ -34,6 +33,7 @@ import org.apache.giraph.metrics.MetricNames;
 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
 import org.apache.giraph.partition.Partition;
 import org.apache.giraph.partition.PartitionStats;
+import org.apache.giraph.partition.PartitionStore;
 import org.apache.giraph.time.SystemTime;
 import org.apache.giraph.time.Time;
 import org.apache.giraph.time.Times;
@@ -61,9 +61,9 @@ import com.yammer.metrics.core.Histogram;
  * when using the out-of-core graph partition store.  We should only load on
  * demand.
  *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
+ * @param <I>  Vertex index value
+ * @param <V>  Vertex value
+ * @param <E>  Edge value
  * @param <M1> Incoming message type
  * @param <M2> Outgoing message type
  */
@@ -80,8 +80,6 @@ public class ComputeCallable<I extends WritableComparable, V 
extends Writable,
   private final Mapper<?, ?, ?, ?>.Context context;
   /** Graph state */
   private final GraphState graphState;
-  /** Thread-safe queue of all partition ids */
-  private final BlockingQueue<Integer> partitionIdQueue;
   /** Message store */
   private final MessageStore<I, M1> messageStore;
   /** Configuration */
@@ -105,23 +103,18 @@ public class ComputeCallable<I extends 
WritableComparable, V extends Writable,
 
   /**
    * Constructor
-   *
    * @param context Context
    * @param graphState Current graph state (use to create own graph state)
    * @param messageStore Message store
-   * @param partitionIdQueue Queue of partition ids (thread-safe)
    * @param configuration Configuration
    * @param serviceWorker Service worker
    */
-  public ComputeCallable(
-      Mapper<?, ?, ?, ?>.Context context, GraphState graphState,
-      MessageStore<I, M1> messageStore,
-      BlockingQueue<Integer> partitionIdQueue,
+  public ComputeCallable(Mapper<?, ?, ?, ?>.Context context,
+      GraphState graphState, MessageStore<I, M1> messageStore,
       ImmutableClassesGiraphConfiguration<I, V, E> configuration,
       CentralizedServiceWorker<I, V, E> serviceWorker) {
     this.context = context;
     this.configuration = configuration;
-    this.partitionIdQueue = partitionIdQueue;
     this.messageStore = messageStore;
     this.serviceWorker = serviceWorker;
     this.graphState = graphState;
@@ -155,16 +148,14 @@ public class ComputeCallable<I extends 
WritableComparable, V extends Writable,
     computation.preSuperstep();
 
     List<PartitionStats> partitionStatsList = Lists.newArrayList();
-    while (!partitionIdQueue.isEmpty()) {
-      Integer partitionId = partitionIdQueue.poll();
-      if (partitionId == null) {
+    PartitionStore<I, V, E> partitionStore = serviceWorker.getPartitionStore();
+    while (true) {
+      long startTime = System.currentTimeMillis();
+      Partition<I, V, E> partition = partitionStore.getNextPartition();
+      if (partition == null) {
         break;
       }
 
-      long startTime = System.currentTimeMillis();
-      Partition<I, V, E> partition =
-          serviceWorker.getPartitionStore().getOrCreatePartition(partitionId);
-
       try {
         serviceWorker.getServerData().resolvePartitionMutation(partition);
         PartitionStats partitionStats =
@@ -179,7 +170,7 @@ public class ComputeCallable<I extends WritableComparable, 
V extends Writable,
         messageBytesSentCounter.inc(partitionMsgBytes);
         timedLogger.info("call: Completed " +
             partitionStatsList.size() + " partitions, " +
-            partitionIdQueue.size() + " remaining " +
+            partitionStore.getNumPartitions() + " remaining " +
             MemoryUtils.getRuntimeMemoryStats());
       } catch (IOException e) {
         throw new IllegalStateException("call: Caught unexpected IOException," 
+
@@ -188,9 +179,8 @@ public class ComputeCallable<I extends WritableComparable, 
V extends Writable,
         throw new IllegalStateException("call: Caught unexpected " +
             "InterruptedException, failing.", e);
       } finally {
-        serviceWorker.getPartitionStore().putPartition(partition);
+        partitionStore.putPartition(partition);
       }
-
       histogramComputePerPartition.update(
           System.currentTimeMillis() - startTime);
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java 
b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 5c80297..0844858 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -25,8 +25,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Enumeration;
 import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -346,7 +344,7 @@ end[PURE_YARN]*/
       // execute the current superstep
       if (numPartitions > 0) {
         processGraphPartitions(context, partitionStatsList, graphState,
-          messageStore, numPartitions, numThreads);
+          messageStore, numThreads);
       }
       finishedSuperstepStats = completeSuperstepAndCollectStats(
         partitionStatsList, superstepTimerContext);
@@ -723,26 +721,22 @@ end[PURE_YARN]*/
    * @param partitionStatsList to pick up this superstep's processing stats
    * @param graphState the BSP graph state
    * @param messageStore the messages to be processed in this superstep
-   * @param numPartitions the number of data partitions (vertices) to process
    * @param numThreads number of concurrent threads to do processing
    */
   private void processGraphPartitions(final Mapper<?, ?, ?, ?>.Context context,
       List<PartitionStats> partitionStatsList,
       final GraphState graphState,
       final MessageStore<I, Writable> messageStore,
-      int numPartitions,
       int numThreads) {
-    final BlockingQueue<Integer> computePartitionIdQueue =
-      new ArrayBlockingQueue<Integer>(numPartitions);
-    long verticesToCompute = 0;
     PartitionStore<I, V, E> partitionStore = serviceWorker.getPartitionStore();
+    long verticesToCompute = 0;
     for (Integer partitionId : partitionStore.getPartitionIds()) {
-      computePartitionIdQueue.add(partitionId);
       verticesToCompute += partitionStore.getPartitionVertexCount(partitionId);
     }
     WorkerProgress.get().startSuperstep(
         serviceWorker.getSuperstep(), verticesToCompute,
         serviceWorker.getPartitionStore().getNumPartitions());
+    partitionStore.startIteration();
 
     GiraphTimerContext computeAllTimerContext = computeAll.time();
     timeToFirstMessageTimerContext = timeToFirstMessage.time();
@@ -756,7 +750,6 @@ end[PURE_YARN]*/
               context,
               graphState,
               messageStore,
-              computePartitionIdQueue,
               conf,
               serviceWorker);
         }

http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/ooc/AdaptiveOutOfCoreEngine.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/AdaptiveOutOfCoreEngine.java 
b/giraph-core/src/main/java/org/apache/giraph/ooc/AdaptiveOutOfCoreEngine.java
new file mode 100644
index 0000000..8d3cab6
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/ooc/AdaptiveOutOfCoreEngine.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.CallableFactory;
+import org.apache.giraph.utils.LogStacktraceCallable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Adaptive out-of-core mechanism. This mechanism spawns two types of threads:
+ *   1) check-memory thread, which periodically monitors the amount of 
available
+ *      memory and decides whether data should go on disk. This threads is
+ *      basically the brain behind the out-of-core mechanism, commands
+ *      "out-of-core processor threads" (type 2 thread below) to move
+ *      appropriate data to disk,
+ *   2) out-of-core processor threads. This is a team of threads responsible 
for
+ *      offloading appropriate data to disk. "check-memory thread" decides on
+ *      which data should go to disk, and "out-of-core processor threads" do 
the
+ *      offloading.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge data
+ */
+public class AdaptiveOutOfCoreEngine<I extends WritableComparable,
+    V extends Writable, E extends Writable> implements
+    OutOfCoreEngine<I, V, E> {
+  /** Class logger. */
+  private static final Logger LOG =
+      Logger.getLogger(AdaptiveOutOfCoreEngine.class);
+
+  // ---- Synchronization Variables ----
+  /** Barrier to coordinate check-memory and OOC-processing threads */
+  private final CyclicBarrier gate;
+  /**
+   * Signal to determine whether OOC processing threads are done processing OOC
+   * requests
+   */
+  private final CyclicBarrier doneOocSignal;
+  /** Signal to determine whether the computation is terminated */
+  private final CountDownLatch doneCompute;
+  /** Finisher signal to OOC processing threads */
+  private volatile boolean done;
+
+  // ---- OOC Commands ----
+  /**
+   * List of partitions that are on disk, and their loaded *vertices* during
+   * INPUT_SUPERSTEP are ready to flush to disk
+   */
+  private final BlockingQueue<Integer> partitionsWithInputVertices;
+  /**
+   * List of partitions that are on disk, and their loaded *edges* during
+   * INPUT_SUPERSTEP are ready to flush to disk
+   */
+  private final BlockingQueue<Integer> partitionsWithInputEdges;
+  /** Number of partitions to be written to the disk */
+  private final AtomicInteger numPartitionsToSpill;
+
+  /** Executor service for check memory thread */
+  private ExecutorService checkMemoryExecutor;
+  /** Executor service for out-of-core processor threads */
+  private ExecutorService outOfCoreProcessorExecutor;
+
+  /** Configuration */
+  private ImmutableClassesGiraphConfiguration<I, V, E> conf;
+  /** Worker */
+  private final CentralizedServiceWorker<I, V, E> serviceWorker;
+
+  /** Cached value for number of out-of-core threads specified by user */
+  private int numOocThreads;
+
+  /** Result of check-memory thread (to be checked for graceful termination) */
+  private Future<Void> checkMemoryResult;
+  /**
+   * Results of out-of-core processor threads (to be checked for graceful
+   * termination)
+   */
+  private List<Future<Void>> oocProcessorResults;
+
+  /**
+   * Creates an instance of adaptive mechanism
+   * @param conf Configuration
+   * @param serviceWorker Worker service
+   */
+  public AdaptiveOutOfCoreEngine(ImmutableClassesGiraphConfiguration conf,
+      CentralizedServiceWorker<I, V, E> serviceWorker) {
+    this.conf = conf;
+    this.serviceWorker = serviceWorker;
+
+    this.numOocThreads = conf.getNumOocThreads();
+    this.gate = new CyclicBarrier(numOocThreads + 1);
+    this.doneOocSignal = new CyclicBarrier(numOocThreads + 1);
+    this.doneCompute = new CountDownLatch(1);
+    this.done = false;
+    this.partitionsWithInputVertices = new ArrayBlockingQueue<Integer>(100);
+    this.partitionsWithInputEdges = new ArrayBlockingQueue<Integer>(100);
+    this.numPartitionsToSpill = new AtomicInteger(0);
+  }
+
+  @Override
+  public void initialize() {
+    if (LOG.isInfoEnabled()) {
+      LOG.info("initialize: initializing out-of-core engine");
+    }
+    CallableFactory<Void> checkMemoryCallableFactory =
+      new CallableFactory<Void>() {
+        @Override
+        public Callable<Void> newCallable(int callableId) {
+          return new CheckMemoryCallable<I, V, E>(
+              AdaptiveOutOfCoreEngine.this, conf, serviceWorker);
+        }
+      };
+    checkMemoryExecutor = Executors.newSingleThreadExecutor(
+        new ThreadFactoryBuilder().setNameFormat("check-memory").build());
+    checkMemoryResult = checkMemoryExecutor.submit(new LogStacktraceCallable<>(
+        checkMemoryCallableFactory.newCallable(0)));
+
+    CallableFactory<Void> outOfCoreProcessorCallableFactory =
+      new CallableFactory<Void>() {
+        @Override
+        public Callable<Void> newCallable(int callableId) {
+          return new OutOfCoreProcessorCallable<I, V, E>(
+              AdaptiveOutOfCoreEngine.this, serviceWorker);
+        }
+      };
+    outOfCoreProcessorExecutor = Executors
+        .newFixedThreadPool(numOocThreads,
+            new ThreadFactoryBuilder().setNameFormat("ooc-%d").build());
+    oocProcessorResults = Lists.newArrayListWithCapacity(numOocThreads);
+    for (int i = 0; i < numOocThreads; ++i) {
+      Future<Void> future = outOfCoreProcessorExecutor.submit(
+          new LogStacktraceCallable<>(
+              outOfCoreProcessorCallableFactory.newCallable(i)));
+      oocProcessorResults.add(future);
+    }
+  }
+
+  @Override
+  public void shutdown() {
+    doneCompute.countDown();
+    checkMemoryExecutor.shutdown();
+    if (checkMemoryResult.isCancelled()) {
+      throw new IllegalStateException(
+          "shutdown: memory check thread did not " + "terminate gracefully!");
+    }
+    outOfCoreProcessorExecutor.shutdown();
+    for (int i = 0; i < numOocThreads; ++i) {
+      if (oocProcessorResults.get(i).isCancelled()) {
+        throw new IllegalStateException("shutdown: out-of-core processor " +
+            "thread " + i + " did not terminate gracefully.");
+      }
+    }
+  }
+
+  /**
+   * @return the latch that signals whether the whole computation is done
+   */
+  public CountDownLatch getDoneCompute() {
+    return doneCompute;
+  }
+
+  /**
+   * @return whether the computation is done
+   */
+  public boolean isDone() {
+    return done;
+  }
+
+  /**
+   * @return list of partitions that have large enough buffers of vertices read
+   *         in INPUT_SUPERSTEP.
+   */
+  public BlockingQueue<Integer> getPartitionsWithInputVertices() {
+    return partitionsWithInputVertices;
+  }
+
+  /**
+   * @return list of partitions that have large enough buffers of edges read
+   *         in INPUT_SUPERSTEP.
+   */
+  public BlockingQueue<Integer> getPartitionsWithInputEdges() {
+    return partitionsWithInputEdges;
+  }
+
+  /**
+   * @return number of partitions to spill to disk
+   */
+  public AtomicInteger getNumPartitionsToSpill() {
+    return numPartitionsToSpill;
+  }
+
+  /**
+   * Wait on gate with which OOC processor threads are notified to execute
+   * commands provided by brain (memory-check thread).
+   *
+   * @throws BrokenBarrierException
+   * @throws InterruptedException
+   */
+  public void waitOnGate() throws BrokenBarrierException, InterruptedException 
{
+    gate.await();
+  }
+
+  /**
+   * Reset the gate for reuse.
+   */
+  public void resetGate() {
+    gate.reset();
+  }
+
+  /**
+   * Wait on signal from all OOC processor threads that the offloading of data
+   * is complete.
+   *
+   * @throws BrokenBarrierException
+   * @throws InterruptedException
+   */
+  public void waitOnOocSignal()
+      throws BrokenBarrierException, InterruptedException {
+    doneOocSignal.await();
+  }
+
+  /**
+   * Reset the completion signal of OOC processor threads for reuse.
+   */
+  public void resetOocSignal() {
+    doneOocSignal.reset();
+  }
+
+  /**
+   * Set the computation as done (i.e. setting the state that determines the
+   * whole computation is done).
+   */
+  public void setDone() {
+    done = true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/ooc/CheckMemoryCallable.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/CheckMemoryCallable.java 
b/giraph-core/src/main/java/org/apache/giraph/ooc/CheckMemoryCallable.java
new file mode 100644
index 0000000..7f52490
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/CheckMemoryCallable.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc;
+
+import org.apache.giraph.bsp.BspService;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.conf.FloatConfOption;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.IntConfOption;
+import org.apache.giraph.utils.PairList;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+import java.util.Stack;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Adaptive out-of-core mechanism brain. This class provides one thread per
+ * worker that periodically checks the free memory on the worker and compares 
it
+ * with total amount of memory given to that worker to run the job. The period
+ * at which the thread checks for the memory is specified by the user. Also,
+ * user can specify the fraction of memory where anytime free memory is less
+ * than that fraction of total memory, actions would be taken to free up space
+ * in memory (this fraction is called LOW_FREE_MEMORY_FRACTION). Also, user can
+ * specify another fraction of available memory where memory pressure is fair
+ * and some of the data on disk (if there is any) can be brought back to memory
+ * again (this fraction is called FAIR_FREE_MEMORY_FRACTION).
+ *
+ * In the adaptive out-of-core mechanism, if amount of free memory becomes less
+ * than LOW_FREE_MEMORY_FRACTION, some data are being considered as potentials
+ * to transfer to disk. These data can be in the following categories:
+ *   1) Vertex buffers read in INPUT_SUPERSTEP. These are vertex input splits
+ *      read for a partition that is out-of-core and PartitionStore holds these
+ *      vertex buffers in in-memory buffers (and postpone their merge with the
+ *      actual partition until the partition is loaded back in memory).
+ *   2) Edge buffers read in INPUT_SUPERSTEP. These are similar buffers to
+ *      vertex buffers, but they keep edge data in INPUT_SUPERSTEP.
+ *   3) Partitions.
+ *
+ * This brain prefers the first two categories in INPUT_SUPERSTEP as long as
+ * size of buffers are large enough that it is worth writing them to disk. In
+ * case where brain decides on spilling partitions to disk, the brain decides
+ * only on the "number of partitions" to spill to disk. It is "out-of-core
+ * processor threads" responsibility to find that many partitions to spill to
+ * disk. The number of partitions to spill is a fraction of number of 
partitions
+ * currently in memory. It is recommended that this fraction be equal to
+ * subtraction of LOW_FREE_MEMORY_FRACTION from FAIR_FREE_MEMORY_FRACTION. Here
+ * is an example to clarify on this recommendation. Assume
+ * LOW_FREE_MEMORY_FRACTION is 5% and FAIR_FREE_MEMORY_FRACTION is 15%. Also
+ * assume that the partitions are similar in their memory footprint (which is a
+ * valid assumption for most of the partitioning techniques). If free memory is
+ * a bit less than 5% of total available memory, if we offload 10%
+ * (15% - 5% = 10%), then the amount of free memory will increase to a bit less
+ * than 15% of total available memory.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+public class CheckMemoryCallable<I extends WritableComparable,
+    V extends Writable, E extends Writable> implements Callable<Void> {
+  /**
+   * Lowest free memory fraction to start doing necessary actions to go
+   * out-of-core.
+   */
+  public static final FloatConfOption LOW_FREE_MEMORY_FRACTION =
+      new FloatConfOption("giraph.lowFreeMemoryFraction", 0.1f,
+          "If free memory fraction goes below this value, GC is called " +
+              "manually and necessary actions are taken if we have to go " +
+              "out-of-core");
+  /**
+   * Expected memory fraction to achieve after detecting that the job is 
running
+   * low in memory. Basically, this memory fraction is the target to achieve
+   * once we decide to offload data on disk.
+   */
+  public static final FloatConfOption MID_FREE_MEMORY_FRACTION =
+      new FloatConfOption("giraph.midFreeMemoryFraction", 0.15f,
+          "Once out-of-core mechanism decides to offload data on disk, it " +
+              "offloads data on disk until free memory fraction reaches this " 
+
+              "fraction.");
+  /**
+   * Memory fraction at which the job gets the best performance considering the
+   * choice of GC strategy. It means, if the amount of free memory is more than
+   * this fraction we will not see severe amount of GC calls.
+   */
+  public static final FloatConfOption FAIR_FREE_MEMORY_FRACTION =
+      new FloatConfOption("giraph.fairFreeMemoryFraction", 0.3f,
+          "The fraction of free memory at which the job shows the best GC " +
+              "performance. This fraction might be dependent on GC strategy " +
+              "used in running the job, but generally 0.3 is a reasonable " +
+              "fraction for most strategies.");
+  /**
+   * Memory fraction at which the job has enough space so we can back off from
+   * the last out-of-core decision, i.e. lazily bringing the last bunch of data
+   * spilled to disk.
+   */
+  public static final FloatConfOption HIGH_FREE_MEMORY_FRACTION =
+      new FloatConfOption("giraph.highFreeMemoryFraction", 0.4f,
+          "Once free memory reaches at this fraction, last out-of-core " +
+              "decision is lazily rolled back, i.e. we back off from " +
+              "out-of-core.");
+  /** Time interval at which checking memory is done periodically. */
+  public static final IntConfOption CHECK_MEMORY_INTERVAL =
+      new IntConfOption("giraph.checkMemoryInterval", 5000,
+          "Time interval (in milliseconds) at which checking memory is done" +
+              " to decide if there should be any out-of-core action.");
+  /** Coefficient by which the number of partitions in memory changes. */
+  public static final FloatConfOption OOC_GRAPH_MODIFICATION_COEFFICIENT =
+      new FloatConfOption("giraph.graphPartitionModificationCoefficient", 0.3f,
+          "If we decide to go out-of-core or back-off from out-of-core, this " 
+
+              "is the multiplier by which the number of in-memory partitions" +
+              "will change.");
+
+  /** Class logger */
+  private static final Logger LOG = 
Logger.getLogger(CheckMemoryCallable.class);
+
+  /** Worker */
+  private final CentralizedServiceWorker<I, V, E> serviceWorker;
+  /** Partition store */
+  private final DiskBackedPartitionStore<I, V, E> partitionStore;
+
+  // ---- Cached Config Values ----
+  /** Cached value of LOW_FREE_MEMORY_FRACTION */
+  private float lowFreeMemoryFraction;
+  /** Cached value for MID_FREE_MEMORY_FRACTION */
+  private float midFreeMemoryFraction;
+  /** Cached value of FAIR_FREE_MEMORY_FRACTION */
+  private float fairFreeMemoryFraction;
+  /** Cached value for HIGH_FREE_MEMORY_FRACTION */
+  private float highFreeMemoryFraction;
+  /** Cached value of CHECK_MEMORY_INTERVAL */
+  private int checkInterval;
+  /** Cached value for OOC_GRAPH_MODIFICATION_COEFFICIENT */
+  private float modificationCoefficient;
+
+  /** List of counts of number of partitions every time we shrink the store */
+  private Stack<Integer> oocPartitionCounts;
+  /** Memory estimator instance */
+  private final MemoryEstimator memoryEstimator;
+  /** Adaptive out-of-core engine */
+  private final AdaptiveOutOfCoreEngine<I, V, E> oocEngine;
+
+  /**
+   * Constructor for check-memory thread.
+   *
+   * @param oocEngine out-of-core engine
+   * @param conf job configuration
+   * @param serviceWorker worker service
+   */
+  public CheckMemoryCallable(AdaptiveOutOfCoreEngine<I, V, E> oocEngine,
+      ImmutableClassesGiraphConfiguration<I, V, E> conf,
+      CentralizedServiceWorker<I, V, E> serviceWorker) {
+    this.oocEngine = oocEngine;
+    this.serviceWorker = serviceWorker;
+    this.partitionStore =
+        (DiskBackedPartitionStore<I, V, E>) serviceWorker.getPartitionStore();
+
+    this.oocPartitionCounts = new Stack<>();
+
+    this.lowFreeMemoryFraction = LOW_FREE_MEMORY_FRACTION.get(conf);
+    this.midFreeMemoryFraction = MID_FREE_MEMORY_FRACTION.get(conf);
+    this.fairFreeMemoryFraction = FAIR_FREE_MEMORY_FRACTION.get(conf);
+    this.highFreeMemoryFraction = HIGH_FREE_MEMORY_FRACTION.get(conf);
+    this.checkInterval = CHECK_MEMORY_INTERVAL.get(conf);
+    this.modificationCoefficient = 
OOC_GRAPH_MODIFICATION_COEFFICIENT.get(conf);
+
+    memoryEstimator = ReflectionUtils
+        .newInstance(GiraphConstants.OUT_OF_CORE_MEM_ESTIMATOR.get(conf));
+  }
+
+  /**
+   * Checks whether the available free memory is enough for an efficient
+   * execution. If memory is limited, offload partitions to disk.
+   * Also, if available memory is more than a threshold, loads partitions from
+   * disk (if there is any) to memory.
+   */
+  @Override
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings("DM_GC")
+  public Void call() {
+    if (LOG.isInfoEnabled()) {
+      LOG.info("call: check-memory thread started.");
+    }
+    memoryEstimator.initialize(serviceWorker);
+    CountDownLatch doneCompute = oocEngine.getDoneCompute();
+    while (doneCompute.getCount() != 0) {
+      double maxMemory = memoryEstimator.maxMemoryMB();
+      double freeMemory = memoryEstimator.freeMemoryMB();
+      boolean gcDone = false;
+      if (freeMemory < lowFreeMemoryFraction * maxMemory) {
+        // This is typically a bad scenario where previous GCs were not
+        // successful to free up enough memory. If we keep staying in this
+        // situation, usually, either the computation slows down dramatically,
+        // or the computation throws OOM error. So, we do GC manually, and
+        // make sure that out-of-core is the solution to get out of this
+        // situation.
+        if (LOG.isInfoEnabled()) {
+          LOG.info("call: Memory is very limited now. Calling GC manually. " +
+              String.format("freeMemory = %.2fMB", freeMemory));
+        }
+        long gcStartTime = System.currentTimeMillis();
+        System.gc();
+        gcDone = true;
+        freeMemory = memoryEstimator.freeMemoryMB();
+        if (LOG.isInfoEnabled()) {
+          LOG.info("call: GC is done. " + String
+              .format("GC time = %.2f sec, and freeMemory = %.2fMB",
+                  (System.currentTimeMillis() - gcStartTime) / 1000.0,
+                  freeMemory));
+        }
+      }
+
+      // If we have enough memory, we roll back the latest shrink in number of
+      // partition slots.
+      // If we do not have enough memory, but we are not in a bad scenario
+      // either, we gradually increase the number of partition slots in memory.
+      // If we are low in free memory, we first push unnecessary data to disk
+      // and then push some partitions to disk if necessary.
+      int numInMemory = partitionStore.getNumPartitionInMemory();
+      int maxInMemory = partitionStore.getNumPartitionSlots();
+      int numInTotal = partitionStore.getNumPartitions();
+      if (freeMemory > highFreeMemoryFraction * maxMemory) {
+        if (numInMemory >= maxInMemory && !oocPartitionCounts.isEmpty()) {
+          partitionStore.increasePartitionSlots(oocPartitionCounts.pop());
+        }
+      } else if (freeMemory > fairFreeMemoryFraction * maxMemory) {
+        // Only gradually increase the number of partition slots if all slots
+        // are already used, and we have things out-of-core
+        if (!oocPartitionCounts.isEmpty() || maxInMemory < numInTotal) {
+          if (numInMemory >= maxInMemory) {
+            partitionStore.increasePartitionSlots(1);
+            if (!oocPartitionCounts.isEmpty()) {
+              int num = oocPartitionCounts.pop();
+              if (num > 1) {
+                oocPartitionCounts.push(num - 1);
+              }
+            }
+          }
+        }
+      } else if (gcDone && freeMemory < midFreeMemoryFraction * maxMemory) {
+        BlockingQueue<Integer> partitionsWithInputVertices =
+            oocEngine.getPartitionsWithInputVertices();
+        BlockingQueue<Integer> partitionsWithInputEdges =
+            oocEngine.getPartitionsWithInputEdges();
+        AtomicInteger numPartitionsToSpill =
+            oocEngine.getNumPartitionsToSpill();
+
+        while (freeMemory < midFreeMemoryFraction * maxMemory) {
+          // Offload input vertex buffer of OOC partitions if we are in
+          // INPUT_SUPERSTEP
+          if (serviceWorker.getSuperstep() == BspService.INPUT_SUPERSTEP) {
+            // List of pairs (partitionId, approximate memory footprint of
+            // vertex buffers of that partition).
+            PairList<Integer, Integer> pairs =
+                partitionStore.getOocPartitionIdsWithPendingInputVertices();
+            freeMemory -= createCommand(pairs, partitionsWithInputVertices);
+          }
+
+          // Offload edge store of OOC partitions if we are in INPUT_SUPERSTEP
+          if (freeMemory < midFreeMemoryFraction * maxMemory &&
+              serviceWorker.getSuperstep() == BspService.INPUT_SUPERSTEP) {
+            PairList<Integer, Integer> pairs =
+                partitionStore.getOocPartitionIdsWithPendingInputEdges();
+            freeMemory -= createCommand(pairs, partitionsWithInputEdges);
+          }
+
+          // Offload partitions if we are still low in free memory
+          if (freeMemory < midFreeMemoryFraction * maxMemory) {
+            numPartitionsToSpill
+                .set(getNextOocPartitionCount(freeMemory, maxMemory));
+          }
+
+          if (!partitionsWithInputVertices.isEmpty() ||
+              !partitionsWithInputEdges.isEmpty() ||
+              numPartitionsToSpill.get() != 0) {
+            if (LOG.isInfoEnabled()) {
+              LOG.info("call: signal out-of-core processor threads to start " +
+                  "offloading. These threads will spill vertex buffer of " +
+                  partitionsWithInputVertices.size() + " partitions, edge " +
+                  "buffers of " + partitionsWithInputEdges.size() +
+                  " partitions, and " + numPartitionsToSpill.get() + " whole " 
+
+                  "partition");
+            }
+            // Opening the gate for OOC processing threads to start spilling
+            // data on disk
+            try {
+              oocEngine.waitOnGate();
+            } catch (InterruptedException e) {
+              throw new IllegalStateException("call: Caught " +
+                  "InterruptedException while opening the gate for OOC " +
+                  "processing threads");
+            } catch (BrokenBarrierException e) {
+              throw new IllegalStateException("call: Caught " +
+                  "BrokenBarrierException while opening the gate for OOC " +
+                  "processing threads");
+            }
+            oocEngine.resetGate();
+
+            if (LOG.isInfoEnabled()) {
+              LOG.info("call: waiting on OOC processors to finish offloading " 
+
+                  "data to disk");
+            }
+            // Wait until all OOC processing threads are done swapping data to
+            // disk
+            try {
+              oocEngine.waitOnOocSignal();
+            } catch (InterruptedException e) {
+              throw new IllegalStateException("call: Caught " +
+                  "InterruptedException. Looks like memory check thread is " +
+                  "interrupted while waiting on OOC processing threads.");
+            } catch (BrokenBarrierException e) {
+              throw new IllegalStateException("call: Caught " +
+                  "BrokenBarrierException. Looks like some OOC processing " +
+                  "threads  broke while writing data on disk.");
+            }
+            oocEngine.resetOocSignal();
+          }
+
+          gcDone = false;
+          long gcStartTime = 0;
+          if (freeMemory < midFreeMemoryFraction * maxMemory) {
+            // Calling GC manually to actually free up the memory for data that
+            // is offloaded to disk
+            if (LOG.isInfoEnabled()) {
+              LOG.info("call: calling GC manually to free up space for " +
+                  "recently offloaded data.");
+            }
+            gcStartTime = System.currentTimeMillis();
+            System.gc();
+            gcDone = true;
+          }
+          freeMemory = memoryEstimator.freeMemoryMB();
+          if (LOG.isInfoEnabled()) {
+            LOG.info("call: " +
+                (gcDone ?
+                    ("GC is done. " + String.format("GC time = %.2f sec.",
+                        (System.currentTimeMillis() - gcStartTime) / 1000.0)) :
+                    "") +
+                "Finished offloading data to disk. " +
+                String.format("freeMemory = %.2fMB", freeMemory));
+          }
+        }
+      }
+
+      // Either wait for the computation to be done, or the time interval 
passes
+      try {
+        doneCompute.await(checkInterval, TimeUnit.MILLISECONDS);
+      } catch (InterruptedException e) {
+        throw new IllegalStateException("call: Caught InterruptedException " +
+            "while waiting for computation to be done and/or " + checkInterval 
+
+            "milliseconds passes.");
+      }
+    }
+
+    // Setting 'done' before the gate here and checking 'done' in OOC 
processing
+    // threads after the gate, guarantees that OOC processing threads see the
+    // new value of done and terminate gracefully.
+    oocEngine.setDone();
+    try {
+      oocEngine.waitOnGate();
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("call: Caught InterruptedException " +
+          "while waiting for the last time on gate in the current superstep");
+    } catch (BrokenBarrierException e) {
+      throw new IllegalStateException("call: Caught BrokenBarrierException " +
+          "while waiting for the last time on gate in the current superstep");
+    }
+    return null;
+  }
+
+  /**
+   * Returns the number of partitions that should go out-of-core at this point.
+   *
+   * @return number of partitions that should go out-of-core
+   * @param freeMemory amount of free memory (in MB)
+   * @param maxMemory amount of max memory (in MB)
+   */
+  private int getNextOocPartitionCount(double freeMemory, double maxMemory) {
+    int numSlots = partitionStore.getNumPartitionSlots();
+    if (numSlots == Integer.MAX_VALUE) {
+      numSlots = partitionStore.getNumPartitions();
+      partitionStore.setNumPartitionSlots(numSlots);
+    }
+
+    double freeFraction = freeMemory / maxMemory;
+    double multiplier = Math.min(
+        // User-specified favorable size to spill to disk
+        modificationCoefficient,
+        // Approximate fraction of current data to spill in order to reach the
+        // fair fraction of free memory
+        (fairFreeMemoryFraction - freeFraction) / (1 - freeFraction));
+    int count = Math.max((int) (numSlots * multiplier), 1);
+    if (count >= numSlots) {
+      LOG.warn("getNextOocPartitionCount: Memory capacity is " +
+          numSlots + " partitions, and OOC mechanism is " +
+          "trying to put " + count + " partitions to disk. This is not " +
+          "possible");
+      // We should have at least one partition in memory
+      count = numSlots - 1;
+      if (count == 0) {
+        LOG.warn("It seems that size of one partition is too large for the " +
+            "available memory.  Try to run the job with more partitions!");
+      }
+    }
+    if (count != 0) {
+      oocPartitionCounts.push(count);
+    }
+    return count;
+  }
+
+  /**
+   * Generates the command for a particular type of data we want to offload to
+   * disk.
+   *
+   * @param pairs list of pair(partitionId, approximate foot-print that is 
going
+   *              of be reduced by offloading the particular data of a
+   *              partition)
+   * @param commands list of partitionIds for which we want to execute the
+   *                 command
+   * @return approximate amount of memory (in MB) that is going to be freed up
+   *         after executing the generated commands
+   */
+  private double createCommand(PairList<Integer, Integer> pairs,
+      BlockingQueue<Integer> commands) {
+    double usedMemory = 0;
+    if (pairs.getSize() != 0) {
+      PairList<Integer, Integer>.Iterator iterator = pairs.getIterator();
+      // Generating commands for out-of-core processor threads to
+      // offload data as long as command queue has space.
+      while (iterator.hasNext() &&
+          commands.remainingCapacity() > 0) {
+        iterator.next();
+        commands.add(iterator.getCurrentFirst());
+        // Having an approximation on the memory foot-print of data to offload
+        // helps us to know how much memory is going to become available by
+        // offloading the data without using internal functions to estimate
+        // free memory again.
+        usedMemory += iterator.getCurrentSecond() / 1024.0 / 1024.0;
+      }
+    }
+    return usedMemory;
+  }
+}

Reply via email to