http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetricLong.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetricLong.java 
b/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetricLong.java
new file mode 100644
index 0000000..ece2fad
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetricLong.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.metrics;
+
+/**
+ * Aggregator over metrics with long values
+ */
+public final class AggregatedMetricLong extends AggregatedMetric<Long> {
+  /**
+   * Constructor
+   */
+  public AggregatedMetricLong() {
+    min = new ValueWithHostname<>(Long.MAX_VALUE);
+    max = new ValueWithHostname<>(Long.MIN_VALUE);
+    sum = (long) 0;
+  }
+
+  @Override
+  public void addItem(Long value, String hostnamePartitionId) {
+    if (value < min.getValue()) {
+      min.set(value, hostnamePartitionId);
+    }
+    if (value > max.getValue()) {
+      max.set(value, hostnamePartitionId);
+    }
+    sum += value;
+    count++;
+  }
+
+  @Override
+  public double mean() {
+    return sum / (double) count;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetrics.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetrics.java 
b/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetrics.java
index 9d8d48f..3ef7127 100644
--- a/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetrics.java
+++ b/giraph-core/src/main/java/org/apache/giraph/metrics/AggregatedMetrics.java
@@ -19,6 +19,8 @@
 package org.apache.giraph.metrics;
 
 import org.apache.giraph.graph.GraphTaskManager;
+import org.apache.giraph.ooc.OutOfCoreEngine;
+import org.apache.giraph.ooc.OutOfCoreIOCallable;
 import org.apache.giraph.worker.BspServiceWorker;
 
 import com.google.common.collect.Maps;
@@ -31,7 +33,7 @@ import java.util.Map;
  */
 public class AggregatedMetrics {
   /** Mapping from name to aggregated metric */
-  private Map<String, AggregatedMetric> metrics = Maps.newHashMap();
+  private Map<String, AggregatedMetric<?>> metrics = Maps.newHashMap();
 
   /**
    * Add value from hostname for a metric.
@@ -43,9 +45,30 @@ public class AggregatedMetrics {
    */
   public AggregatedMetrics add(String name, long value,
                                String hostnamePartitionId) {
-    AggregatedMetric aggregatedMetric = metrics.get(name);
+    AggregatedMetricLong aggregatedMetric =
+        (AggregatedMetricLong) metrics.get(name);
     if (aggregatedMetric == null) {
-      aggregatedMetric = new AggregatedMetric();
+      aggregatedMetric = new AggregatedMetricLong();
+      metrics.put(name, aggregatedMetric);
+    }
+    aggregatedMetric.addItem(value, hostnamePartitionId);
+    return this;
+  }
+
+  /**
+   * Add value from hostname for a metric.
+   *
+   * @param name String name of metric
+   * @param value double value to track
+   * @param hostnamePartitionId String host it came from
+   * @return this
+   */
+  public AggregatedMetrics add(String name, double value,
+                               String hostnamePartitionId) {
+    AggregatedMetricDouble aggregatedMetric =
+        (AggregatedMetricDouble) metrics.get(name);
+    if (aggregatedMetric == null) {
+      aggregatedMetric = new AggregatedMetricDouble();
       metrics.put(name, aggregatedMetric);
     }
     aggregatedMetric.addItem(value, hostnamePartitionId);
@@ -71,6 +94,12 @@ public class AggregatedMetrics {
         workerMetrics.getTimeToFirstMsg(), hostname);
     add(BspServiceWorker.TIMER_WAIT_REQUESTS,
         workerMetrics.getWaitRequestsTimer(), hostname);
+    add(OutOfCoreIOCallable.BYTES_LOAD_FROM_DISK,
+        workerMetrics.getBytesLoadedFromDisk(), hostname);
+    add(OutOfCoreIOCallable.BYTES_STORE_TO_DISK,
+        workerMetrics.getBytesStoredOnDisk(), hostname);
+    add(OutOfCoreEngine.GRAPH_PERCENTAGE_IN_MEMORY,
+        workerMetrics.getGraphPercentageInMemory(), hostname);
     return this;
   }
 
@@ -89,6 +118,12 @@ public class AggregatedMetrics {
         get(GraphTaskManager.TIMER_TIME_TO_FIRST_MSG);
     AggregatedMetric waitRequestsMicros = get(
         BspServiceWorker.TIMER_WAIT_REQUESTS);
+    AggregatedMetric bytesLoaded =
+        get(OutOfCoreIOCallable.BYTES_LOAD_FROM_DISK);
+    AggregatedMetric bytesStored =
+        get(OutOfCoreIOCallable.BYTES_STORE_TO_DISK);
+    AggregatedMetric graphInMem =
+        get(OutOfCoreEngine.GRAPH_PERCENTAGE_IN_MEMORY);
 
     out.println();
     out.println("--- METRICS: superstep " + superstep + " ---");
@@ -97,6 +132,9 @@ public class AggregatedMetrics {
     printAggregatedMetric(out, "network communication time", "ms", commTime);
     printAggregatedMetric(out, "time to first message", "us", timeToFirstMsg);
     printAggregatedMetric(out, "wait requests time", "us", waitRequestsMicros);
+    printAggregatedMetric(out, "bytes loaded from disk", "bytes", bytesLoaded);
+    printAggregatedMetric(out, "bytes stored to disk", "bytes", bytesStored);
+    printAggregatedMetric(out, "graph in mem", "%", graphInMem);
 
     return this;
   }
@@ -106,17 +144,17 @@ public class AggregatedMetrics {
    *
    * @param out PrintStream to write to
    * @param header String header to print.
-   * @param timeUnit String time unit of metric
+   * @param unit String unit of metric
    * @param aggregatedMetric AggregatedMetric to write
    */
   private void printAggregatedMetric(PrintStream out, String header,
-                                     String timeUnit,
+                                     String unit,
                                      AggregatedMetric aggregatedMetric) {
     if (aggregatedMetric.hasData()) {
       out.println(header);
-      out.println("  mean: " + aggregatedMetric.mean() + " " + timeUnit);
-      printValueFromHost(out, "  slowest: ", timeUnit, aggregatedMetric.max());
-      printValueFromHost(out, "  fastest: ", timeUnit, aggregatedMetric.min());
+      out.println("  mean: " + aggregatedMetric.mean() + " " + unit);
+      printValueFromHost(out, "  smallest: ", unit, aggregatedMetric.max());
+      printValueFromHost(out, "  largest: ", unit, aggregatedMetric.min());
     } else {
       out.println(header + ": NO DATA");
     }
@@ -127,12 +165,12 @@ public class AggregatedMetrics {
    *
    * @param out PrintStream to write to
    * @param prefix String to write at beginning
-   * @param timeUnit String timeUnit of metric
+   * @param unit String unit of metric
    * @param vh ValueWithHostname to write
    */
   private void printValueFromHost(PrintStream out, String prefix,
-                                  String timeUnit, ValueWithHostname vh) {
-    out.println(prefix + vh.getValue() + ' ' + timeUnit +
+                                  String unit, ValueWithHostname vh) {
+    out.println(prefix + vh.getValue() + ' ' + unit +
         " from " + vh.getHostname());
   }
 
@@ -151,7 +189,7 @@ public class AggregatedMetrics {
    *
    * @return Map of all the aggregated metrics.
    */
-  public Map<String, AggregatedMetric> getAll() {
+  public Map<String, AggregatedMetric<?>> getAll() {
     return metrics;
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/metrics/ValueWithHostname.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/metrics/ValueWithHostname.java 
b/giraph-core/src/main/java/org/apache/giraph/metrics/ValueWithHostname.java
index 208c4c7..91a7398 100644
--- a/giraph-core/src/main/java/org/apache/giraph/metrics/ValueWithHostname.java
+++ b/giraph-core/src/main/java/org/apache/giraph/metrics/ValueWithHostname.java
@@ -20,27 +20,29 @@ package org.apache.giraph.metrics;
 
 /**
  * Pair of value with host it came from.
+ *
+ * @param <T> types of value (either long or double)
  */
-class ValueWithHostname {
+class ValueWithHostname<T extends Number> {
   /** long value we're holding */
-  private long value;
+  private T value;
   /** host associated with value */
   private String hostname;
 
   /**
    * Create with initial value
    *
-   * @param value long initial value to use
+   * @param value initial value to use
    */
-  public ValueWithHostname(long value) {
+  public ValueWithHostname(T value) {
     this.value = value;
     this.hostname = null;
   }
 
   /**
-   * @return long value
+   * @return value
    */
-  public long getValue() {
+  public T getValue() {
     return value;
   }
 
@@ -62,10 +64,10 @@ class ValueWithHostname {
 
   /**
    * Set value and partition together.
-   * @param value long value to use.
+   * @param value value to use.
    * @param hostname String host it came from.
    */
-  public void set(long value, String hostname) {
+  public void set(T value, String hostname) {
     this.value = value;
     this.hostname = hostname;
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java
 
b/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java
index 05ec55b..219bcbd 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/metrics/WorkerSuperstepMetrics.java
@@ -19,6 +19,8 @@
 package org.apache.giraph.metrics;
 
 import org.apache.giraph.graph.GraphTaskManager;
+import org.apache.giraph.ooc.OutOfCoreEngine;
+import org.apache.giraph.ooc.OutOfCoreIOCallable;
 import org.apache.giraph.worker.BspServiceWorker;
 import org.apache.hadoop.io.Writable;
 
@@ -28,6 +30,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.PrintStream;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Per-superstep metrics for a Worker.
@@ -43,6 +46,14 @@ public class WorkerSuperstepMetrics implements Writable {
   private LongAndTimeUnit superstepTimer;
   /** Time spent waiting for other workers to finish */
   private LongAndTimeUnit waitRequestsTimer;
+  /** Time spent doing GC in a superstep */
+  private LongAndTimeUnit superstepGCTimer;
+  /** Number of bytes loaded from disk to memory in out-of-core mechanism */
+  private long bytesLoadedFromDisk;
+  /** Number of bytes stored from memory to disk in out-of-core mechanism */
+  private long bytesStoredOnDisk;
+  /** Percentage of graph kept in memory */
+  private double graphPercentageInMemory;
 
   /**
    * Constructor
@@ -53,6 +64,11 @@ public class WorkerSuperstepMetrics implements Writable {
     timeToFirstMsg = new LongAndTimeUnit();
     superstepTimer = new LongAndTimeUnit();
     waitRequestsTimer = new LongAndTimeUnit();
+    superstepGCTimer = new LongAndTimeUnit();
+    superstepGCTimer.setTimeUnit(TimeUnit.MILLISECONDS);
+    bytesLoadedFromDisk = 0;
+    bytesStoredOnDisk = 0;
+    graphPercentageInMemory = 0;
   }
 
   /**
@@ -66,6 +82,20 @@ public class WorkerSuperstepMetrics implements Writable {
     readGiraphTimer(GraphTaskManager.TIMER_TIME_TO_FIRST_MSG, timeToFirstMsg);
     readGiraphTimer(GraphTaskManager.TIMER_SUPERSTEP_TIME, superstepTimer);
     readGiraphTimer(BspServiceWorker.TIMER_WAIT_REQUESTS, waitRequestsTimer);
+    SuperstepMetricsRegistry registry = GiraphMetrics.get().perSuperstep();
+    superstepGCTimer.setValue(
+        registry.getCounter(GraphTaskManager.TIMER_SUPERSTEP_GC_TIME).count());
+    bytesLoadedFromDisk =
+        registry.getCounter(OutOfCoreIOCallable.BYTES_LOAD_FROM_DISK).count();
+    bytesStoredOnDisk =
+        registry.getCounter(OutOfCoreIOCallable.BYTES_STORE_TO_DISK).count();
+    Gauge<Double> gauge =
+        registry.getExistingGauge(OutOfCoreEngine.GRAPH_PERCENTAGE_IN_MEMORY);
+    if (gauge != null) {
+      graphPercentageInMemory = gauge.value();
+    } else {
+      graphPercentageInMemory = 100;
+    }
     return this;
   }
 
@@ -99,6 +129,9 @@ public class WorkerSuperstepMetrics implements Writable {
     out.println("--- METRICS: superstep " + superstep + " ---");
     out.println("  superstep time: " + superstepTimer);
     out.println("  compute all partitions: " + computeAllTimer);
+    out.println("  time spent in gc: " + superstepGCTimer);
+    out.println("  bytes transferred in out-of-core: " +
+        (bytesLoadedFromDisk + bytesStoredOnDisk));
     out.println("  network communication time: " + commTimer);
     out.println("  time to first message: " + timeToFirstMsg);
     out.println("  wait on requests time: " + waitRequestsTimer);
@@ -140,6 +173,29 @@ public class WorkerSuperstepMetrics implements Writable {
     return waitRequestsTimer.getValue();
   }
 
+  /**
+   * @return number of bytes loaded from disk by out-of-core mechanism (if any
+   *         is used)
+   */
+  public long getBytesLoadedFromDisk() {
+    return bytesLoadedFromDisk;
+  }
+
+  /**
+   * @return number of bytes stored on disk by out-of-core mechanism (if any is
+   *         used)
+   */
+  public long getBytesStoredOnDisk() {
+    return bytesStoredOnDisk;
+  }
+
+  /**
+   * @return a rough estimate of percentage of graph in memory
+   */
+  public double getGraphPercentageInMemory() {
+    return graphPercentageInMemory;
+  }
+
   @Override
   public void readFields(DataInput dataInput) throws IOException {
     commTimer.setValue(dataInput.readLong());
@@ -147,6 +203,9 @@ public class WorkerSuperstepMetrics implements Writable {
     timeToFirstMsg.setValue(dataInput.readLong());
     superstepTimer.setValue(dataInput.readLong());
     waitRequestsTimer.setValue(dataInput.readLong());
+    bytesLoadedFromDisk = dataInput.readLong();
+    bytesStoredOnDisk = dataInput.readLong();
+    graphPercentageInMemory = dataInput.readDouble();
   }
 
   @Override
@@ -156,5 +215,8 @@ public class WorkerSuperstepMetrics implements Writable {
     dataOutput.writeLong(timeToFirstMsg.getValue());
     dataOutput.writeLong(superstepTimer.getValue());
     dataOutput.writeLong(waitRequestsTimer.getValue());
+    dataOutput.writeLong(bytesLoadedFromDisk);
+    dataOutput.writeLong(bytesStoredOnDisk);
+    dataOutput.writeDouble(graphPercentageInMemory);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/FixedOutOfCoreEngine.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/FixedOutOfCoreEngine.java 
b/giraph-core/src/main/java/org/apache/giraph/ooc/FixedOutOfCoreEngine.java
deleted file mode 100644
index 9324239..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/FixedOutOfCoreEngine.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.ooc;
-
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.ooc.data.MetaPartitionManager;
-import org.apache.giraph.ooc.io.IOCommand;
-import org.apache.giraph.ooc.io.LoadPartitionIOCommand;
-import org.apache.giraph.ooc.io.StorePartitionIOCommand;
-import org.apache.giraph.ooc.io.WaitIOCommand;
-import org.apache.log4j.Logger;
-
-/**
- * Out-of-core engine maintaining fixed number of partitions in memory.
- */
-public class FixedOutOfCoreEngine extends OutOfCoreEngine {
-  /** Class logger. */
-  private static final Logger LOG =
-      Logger.getLogger(FixedOutOfCoreEngine.class);
-  /**
-   * When getting partitions, how many milliseconds to wait if no partition was
-   * available in memory
-   */
-  private static final long MSEC_TO_WAIT = 1000;
-  /**
-   * Dummy object to wait on until a partition becomes available in memory
-   * for processing
-   */
-  private final Object partitionAvailable = new Object();
-
-  /**
-   * Constructor
-   *
-   * @param conf Configuration
-   * @param service Service worker
-   * @param maxPartitionsInMemory Maximum number of partitions that can be kept
-   *                              in memory
-   */
-  public FixedOutOfCoreEngine(ImmutableClassesGiraphConfiguration<?, ?, ?> 
conf,
-                              CentralizedServiceWorker<?, ?, ?> service,
-                              int maxPartitionsInMemory) {
-    super(conf, service);
-    this.ioScheduler = new FixedOutOfCoreIOScheduler(maxPartitionsInMemory,
-        numIOThreads, this, conf);
-  }
-
-  @Override
-  public Integer getNextPartition() {
-    Integer partitionId;
-    synchronized (partitionAvailable) {
-      while ((partitionId = metaPartitionManager.getNextPartition()) == null) {
-        try {
-          if (LOG.isInfoEnabled()) {
-            LOG.info("getNextPartition: waiting until a partition becomes " +
-                "available!");
-          }
-          partitionAvailable.wait(MSEC_TO_WAIT);
-        } catch (InterruptedException e) {
-          throw new IllegalStateException("getNextPartition: caught " +
-              "InterruptedException while waiting to retrieve a partition to " 
+
-              "process");
-        }
-        if (jobFailed) {
-          throw new RuntimeException("Job Failed due to a failure in an " +
-              "out-of-core IO thread");
-        }
-      }
-    }
-    if (partitionId == MetaPartitionManager.NO_PARTITION_TO_PROCESS) {
-      partitionId = null;
-    }
-    return partitionId;
-  }
-
-  @Override
-  public void doneProcessingPartition(int partitionId) {
-    metaPartitionManager.setPartitionIsProcessed(partitionId);
-    // Put the partition in store IO command queue and announce this partition
-    // as a candidate to offload to disk.
-    if (LOG.isInfoEnabled()) {
-      LOG.info("doneProcessingPartition: processing partition " + partitionId +
-          " is done!");
-    }
-    ioScheduler.addIOCommand(new StorePartitionIOCommand(this, partitionId));
-  }
-
-  @Override
-  public void startIteration() {
-    getSuperstepLock().writeLock().lock();
-    metaPartitionManager.resetPartition();
-    ((FixedOutOfCoreIOScheduler) ioScheduler).clearStoreCommandQueue();
-    getSuperstepLock().writeLock().unlock();
-  }
-
-  @Override
-  public void retrievePartition(int partitionId) {
-    long superstep = service.getSuperstep();
-    if (metaPartitionManager.isPartitionOnDisk(partitionId)) {
-      ioScheduler.addIOCommand(new LoadPartitionIOCommand(this, partitionId,
-          superstep));
-      synchronized (partitionAvailable) {
-        while (metaPartitionManager.isPartitionOnDisk(partitionId)) {
-          try {
-            if (LOG.isInfoEnabled()) {
-              LOG.info("retrievePartition: waiting until partition " +
-                  partitionId + " becomes available");
-            }
-            partitionAvailable.wait();
-          } catch (InterruptedException e) {
-            throw new IllegalStateException("retrievePartition: caught " +
-                "InterruptedException while waiting to retrieve partition " +
-                partitionId);
-          }
-        }
-      }
-    }
-  }
-
-  @Override
-  public void ioCommandCompleted(IOCommand command) {
-    if (command instanceof LoadPartitionIOCommand ||
-        command instanceof WaitIOCommand) {
-      // Notifying compute threads who are waiting for a partition to become
-      // available in memory to process.
-      synchronized (partitionAvailable) {
-        partitionAvailable.notifyAll();
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/FixedOutOfCoreIOScheduler.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/FixedOutOfCoreIOScheduler.java
 
b/giraph-core/src/main/java/org/apache/giraph/ooc/FixedOutOfCoreIOScheduler.java
deleted file mode 100644
index 2cb002f..0000000
--- 
a/giraph-core/src/main/java/org/apache/giraph/ooc/FixedOutOfCoreIOScheduler.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.ooc;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.ooc.io.IOCommand;
-import org.apache.giraph.ooc.io.LoadPartitionIOCommand;
-import org.apache.giraph.ooc.io.StoreDataBufferIOCommand;
-import org.apache.giraph.ooc.io.StoreIncomingMessageIOCommand;
-import org.apache.giraph.ooc.io.StorePartitionIOCommand;
-import org.apache.giraph.ooc.io.WaitIOCommand;
-import org.apache.log4j.Logger;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static com.google.common.base.Preconditions.checkState;
-
-/**
- * IO Scheduler for out-of-core mechanism with fixed number of partitions in
- * memory
- */
-public class FixedOutOfCoreIOScheduler extends OutOfCoreIOScheduler {
-  /** Class logger */
-  private static final Logger LOG =
-      Logger.getLogger(FixedOutOfCoreIOScheduler.class);
-  /** Maximum number of partitions to be kept in memory */
-  private final int maxPartitionsInMemory;
-  /**
-   * Number of partitions to be added (loaded) or removed (stored) to/from
-   * memory. Each outstanding load partition counts +1 and each outstanding
-   * store partition counts -1 toward this counter.
-   */
-  private final AtomicInteger deltaNumPartitionsInMemory =
-      new AtomicInteger(0);
-  /** Queue of IO commands for loading partitions to memory */
-  private final List<Queue<IOCommand>> threadLoadCommandQueue;
-  /** Queue of IO commands for storing partition on disk */
-  private final List<Queue<IOCommand>> threadStoreCommandQueue;
-  /** Whether IO threads should terminate */
-  private volatile boolean shouldTerminate;
-
-  /**
-   * Constructor
-   *  @param maxPartitionsInMemory maximum number of partitions can be kept in
-   *                              memory
-   * @param numThreads number of available IO threads (i.e. disks)
-   * @param oocEngine out-of-core engine
-   * @param conf configuration
-   */
-  public FixedOutOfCoreIOScheduler(int maxPartitionsInMemory, int numThreads,
-                                   OutOfCoreEngine oocEngine,
-                                   ImmutableClassesGiraphConfiguration conf) {
-    super(conf, oocEngine, numThreads);
-    this.maxPartitionsInMemory = maxPartitionsInMemory;
-    threadLoadCommandQueue = new ArrayList<>(numThreads);
-    threadStoreCommandQueue = new ArrayList<>(numThreads);
-    for (int i = 0; i < numThreads; ++i) {
-      threadLoadCommandQueue.add(
-          new ConcurrentLinkedQueue<IOCommand>());
-      threadStoreCommandQueue.add(
-          new ConcurrentLinkedQueue<IOCommand>());
-    }
-    shouldTerminate = false;
-  }
-
-  @Override
-  public IOCommand getNextIOCommand(int threadId) {
-    if (shouldTerminate) {
-      return null;
-    }
-    int numPartitionsInMemory =
-        oocEngine.getMetaPartitionManager().getNumInMemoryPartitions();
-    IOCommand command = null;
-    if (LOG.isInfoEnabled()) {
-      LOG.info("getNextIOCommand with " + numPartitionsInMemory +
-          " partitions in memory, " + deltaNumPartitionsInMemory.get() +
-          " on the fly");
-    }
-    // Check if we have to store a partition on disk
-    if (numPartitionsInMemory + deltaNumPartitionsInMemory.getAndDecrement() >
-        maxPartitionsInMemory) {
-      command = threadStoreCommandQueue.get(threadId).poll();
-      if (command == null) {
-        Integer partitionId = oocEngine.getMetaPartitionManager()
-            .getOffloadPartitionId(threadId);
-        if (partitionId != null) {
-          command = new StorePartitionIOCommand(oocEngine, partitionId);
-        } else {
-          deltaNumPartitionsInMemory.getAndIncrement();
-        }
-      } else {
-        checkState(command instanceof StorePartitionIOCommand,
-            "getNextIOCommand: Illegal command type in store command queue!");
-      }
-    } else {
-      // Roll back the decrement in delta counter.
-      deltaNumPartitionsInMemory.getAndIncrement();
-    }
-
-    // Check if there is any buffers/messages of current out-of-core partitions
-    // in memory
-    if (command == null) {
-      Integer partitionId = oocEngine.getMetaPartitionManager()
-          .getOffloadPartitionBufferId(threadId);
-      if (partitionId != null) {
-        command = new StoreDataBufferIOCommand(oocEngine, partitionId,
-            StoreDataBufferIOCommand.DataBufferType.PARTITION);
-      } else {
-        partitionId = oocEngine.getMetaPartitionManager()
-            .getOffloadMessageBufferId(threadId);
-        if (partitionId != null) {
-          command = new StoreDataBufferIOCommand(oocEngine, partitionId,
-              StoreDataBufferIOCommand.DataBufferType.MESSAGE);
-        } else {
-          partitionId = oocEngine.getMetaPartitionManager()
-              .getOffloadMessageId(threadId);
-          if (partitionId != null) {
-            command = new StoreIncomingMessageIOCommand(oocEngine, 
partitionId);
-          }
-        }
-      }
-    }
-
-    // Check if we can load/prefetch a partition to memory
-    if (command == null) {
-      if (numPartitionsInMemory +
-          deltaNumPartitionsInMemory.getAndIncrement() <=
-          maxPartitionsInMemory) {
-        command = threadLoadCommandQueue.get(threadId).poll();
-        if (command == null) {
-          Integer partitionId = oocEngine.getMetaPartitionManager()
-              .getPrefetchPartitionId(threadId);
-          if (partitionId != null) {
-            command = new LoadPartitionIOCommand(oocEngine, partitionId,
-                oocEngine.getServiceWorker().getSuperstep());
-          } else {
-            deltaNumPartitionsInMemory.getAndDecrement();
-          }
-        }
-      } else {
-        // Roll back the increment in delta counter.
-        deltaNumPartitionsInMemory.getAndDecrement();
-      }
-    }
-
-    // Check if no appropriate IO command is found
-    if (command == null) {
-      command = new WaitIOCommand(oocEngine, waitInterval);
-    }
-    return command;
-  }
-
-  @Override
-  public void ioCommandCompleted(IOCommand command) {
-    if (command instanceof LoadPartitionIOCommand) {
-      deltaNumPartitionsInMemory.getAndDecrement();
-    } else if (command instanceof StorePartitionIOCommand) {
-      deltaNumPartitionsInMemory.getAndIncrement();
-    }
-    oocEngine.ioCommandCompleted(command);
-  }
-
-  @Override
-  public void addIOCommand(IOCommand ioCommand) {
-    int ownerThread = getOwnerThreadId(ioCommand.getPartitionId());
-    if (ioCommand instanceof LoadPartitionIOCommand) {
-      threadLoadCommandQueue.get(ownerThread).offer(ioCommand);
-    } else if (ioCommand instanceof StorePartitionIOCommand) {
-      threadStoreCommandQueue.get(ownerThread).offer(ioCommand);
-    } else {
-      throw new IllegalStateException("addIOCommand: IO command type is not " +
-          "supported for addition");
-    }
-  }
-
-  @Override
-  public void shutdown() {
-    super.shutdown();
-    shouldTerminate = true;
-  }
-
-  /**
-   * Clear store command queue (should happen at the beginning of each 
iteration
-   * to eliminate eager store commands generated by OOC engine)
-   */
-  public void clearStoreCommandQueue() {
-    for (int i = 0; i < threadStoreCommandQueue.size(); ++i) {
-      threadStoreCommandQueue.get(i).clear();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/FixedPartitionsOracle.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/FixedPartitionsOracle.java 
b/giraph-core/src/main/java/org/apache/giraph/ooc/FixedPartitionsOracle.java
new file mode 100644
index 0000000..f7badcb
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/FixedPartitionsOracle.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc;
+
+import com.sun.management.GarbageCollectionNotificationInfo;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.ooc.io.IOCommand;
+import org.apache.giraph.ooc.io.LoadPartitionIOCommand;
+import org.apache.giraph.ooc.io.StorePartitionIOCommand;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Oracle for fixed out-of-core mechanism */
+public class FixedPartitionsOracle implements OutOfCoreOracle {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(FixedPartitionsOracle.class);
+  /** Maximum number of partitions to be kept in memory */
+  private final int maxPartitionsInMemory;
+  /**
+   * Number of partitions to be added (loaded) or removed (stored) to/from
+   * memory. Each outstanding load partition counts +1 and each outstanding
+   * store partition counts -1 toward this counter.
+   */
+  private final AtomicInteger deltaNumPartitionsInMemory =
+      new AtomicInteger(0);
+  /** Out-of-core engine */
+  private final OutOfCoreEngine oocEngine;
+
+  /**
+   * Constructor
+   *
+   * @param conf configuration
+   * @param oocEngine out-of-core engine
+   */
+  public FixedPartitionsOracle(ImmutableClassesGiraphConfiguration conf,
+                               OutOfCoreEngine oocEngine) {
+    this.maxPartitionsInMemory =
+        GiraphConstants.MAX_PARTITIONS_IN_MEMORY.get(conf);
+    this.oocEngine = oocEngine;
+  }
+
+  @Override
+  public IOAction[] getNextIOActions() {
+    int numPartitionsInMemory =
+        oocEngine.getMetaPartitionManager().getNumInMemoryPartitions();
+    if (LOG.isInfoEnabled()) {
+      LOG.info("getNextIOActions: calling with " + numPartitionsInMemory +
+          " partitions in memory, " + deltaNumPartitionsInMemory.get() +
+          " to be loaded");
+    }
+    int numPartitions =
+        numPartitionsInMemory + deltaNumPartitionsInMemory.get();
+    // Fixed out-of-core policy:
+    //   - if the number of partitions in memory is less than the max number of
+    //     partitions in memory, we should load a partition to memory. This
+    //     basically means we are prefetching partition's data either for the
+    //     current superstep, or for the next superstep.
+    //   - if the number of partitions in memory is equal to the the max number
+    //     of partitions in memory, we do a 'soft store', meaning, we store
+    //     processed partition to disk only if there is an unprocessed 
partition
+    //     on disk. This basically makes room for unprocessed partitions on 
disk
+    //     to be prefetched.
+    //   - if the number of partitions in memory is more than the max number of
+    //     partitions in memory, we do a 'hard store', meaning we store a
+    //     partition to disk, regardless of its processing state.
+    if (numPartitions < maxPartitionsInMemory) {
+      return new IOAction[]{
+        IOAction.LOAD_PARTITION,
+        IOAction.STORE_MESSAGES_AND_BUFFERS};
+    } else if (numPartitions > maxPartitionsInMemory) {
+      LOG.warn("getNextIOActions: number of partitions in memory passed the " +
+          "specified threshold!");
+      return new IOAction[]{
+        IOAction.STORE_PARTITION,
+        IOAction.STORE_MESSAGES_AND_BUFFERS};
+    } else {
+      return new IOAction[]{
+        IOAction.STORE_MESSAGES_AND_BUFFERS,
+        IOAction.LOAD_TO_SWAP_PARTITION};
+    }
+  }
+
+  @Override
+  public boolean approve(IOCommand command) {
+    int numPartitionsInMemory = oocEngine.getMetaPartitionManager()
+        .getNumInMemoryPartitions();
+    // If loading a partition result in having more partition in memory, the
+    // command should be denied. Also, if number of partitions in memory is
+    // already less than the max number of partitions, any command for storing
+    // a partition should be denied.
+    if (command instanceof LoadPartitionIOCommand &&
+        numPartitionsInMemory + deltaNumPartitionsInMemory.getAndIncrement() >
+            maxPartitionsInMemory) {
+      deltaNumPartitionsInMemory.getAndDecrement();
+      return false;
+
+    } else if (command instanceof StorePartitionIOCommand &&
+        numPartitionsInMemory + deltaNumPartitionsInMemory.getAndDecrement() <
+            maxPartitionsInMemory) {
+      deltaNumPartitionsInMemory.getAndIncrement();
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public void commandCompleted(IOCommand command) {
+    if (command instanceof LoadPartitionIOCommand) {
+      deltaNumPartitionsInMemory.getAndDecrement();
+    } else if (command instanceof StorePartitionIOCommand) {
+      deltaNumPartitionsInMemory.getAndIncrement();
+    }
+  }
+
+  @Override
+  public void gcCompleted(GarbageCollectionNotificationInfo gcInfo) { }
+
+  @Override
+  public void shutdown() { }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java 
b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
index bc0ece4..d4c2de5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
@@ -18,14 +18,26 @@
 
 package org.apache.giraph.ooc;
 
+import com.sun.management.GarbageCollectionNotificationInfo;
+import com.yammer.metrics.util.PercentGauge;
+import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.comm.flow_control.FlowControl;
+import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.metrics.GiraphMetrics;
+import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
+import org.apache.giraph.metrics.SuperstepMetricsRegistry;
 import org.apache.giraph.ooc.data.MetaPartitionManager;
 import org.apache.giraph.ooc.io.IOCommand;
+import org.apache.giraph.ooc.io.LoadPartitionIOCommand;
+import org.apache.giraph.utils.AdjustableSemaphore;
+import org.apache.giraph.worker.BspServiceWorker;
 import org.apache.log4j.Logger;
 
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -34,24 +46,41 @@ import static 
com.google.common.base.Preconditions.checkState;
 /**
  * Class to represent an out-of-core engine.
  */
-public abstract class OutOfCoreEngine {
+public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
+  /**
+   * Number of 'units of processing' after which an active thread should
+   * check-in with the out-of-core engine in order to re-claim its permission 
to
+   * stay active. For a compute thread, the 'unit of processing' is processing
+   * of one vertex, and for an input thread, the 'unit of processing' is 
reading
+   * a row of input data.
+   */
+  public static final int CHECK_IN_INTERVAL = (1 << 10) - 1;
+  /** Name of metric for percentage of graph on disk */
+  public static final String GRAPH_PERCENTAGE_IN_MEMORY = "ooc-graph-in-mem-%";
   /** Class logger. */
   private static final Logger LOG = Logger.getLogger(OutOfCoreEngine.class);
+  /**
+   * When getting partitions, how many milliseconds to wait if no partition was
+   * available in memory
+   */
+  private static final long MSEC_TO_WAIT = 10000;
   /** Service worker */
-  protected final CentralizedServiceWorker<?, ?, ?> service;
+  private final CentralizedServiceWorker<?, ?, ?> service;
+  /** Flow control used in sending requests */
+  private FlowControl flowControl;
   /** Scheduler for IO threads */
-  protected OutOfCoreIOScheduler ioScheduler;
+  private final OutOfCoreIOScheduler ioScheduler;
   /** Data structure to keep meta partition information */
-  protected final MetaPartitionManager metaPartitionManager;
-  /** How many disk (i.e. IO threads) do we have? */
-  protected final int numIOThreads;
+  private final MetaPartitionManager metaPartitionManager;
+  /** Out-of-core oracle (brain of out-of-core mechanism) */
+  private final OutOfCoreOracle oracle;
   /**
    * Whether the job should fail due to IO threads terminating because of
    * exceptions
    */
-  protected volatile boolean jobFailed = false;
-  /** Whether the out-of-core engine has initialized */
-  private final AtomicBoolean isInitialized = new AtomicBoolean(false);
+  private volatile boolean jobFailed = false;
+  /** IO statistics collector */
+  private final OutOfCoreIOStatistics statistics;
   /**
    * Global lock for entire superstep. This lock helps to avoid overlapping of
    * out-of-core decisions (what to do next to help the out-of-core mechanism)
@@ -60,6 +89,56 @@ public abstract class OutOfCoreEngine {
   private final ReadWriteLock superstepLock = new ReentrantReadWriteLock();
   /** Callable factory for IO threads */
   private final OutOfCoreIOCallableFactory oocIOCallableFactory;
+  /**
+   * Dummy object to wait on until a partition becomes available in memory
+   * for processing
+   */
+  private final Object partitionAvailable = new Object();
+  /** How many compute threads do we have? */
+  private int numComputeThreads;
+  /** How many threads (input/compute) are processing data? */
+  private volatile int numProcessingThreads;
+  /** Semaphore used for controlling number of active threads at each moment */
+  private final AdjustableSemaphore activeThreadsPermit;
+  /**
+   * Generally, the logic in Giraph for change of the superstep happens in the
+   * following order:
+   *   (1) Compute threads are done processing all partitions
+   *   (2) Superstep number increases
+   *   (3) New message store is created and message stores are prepared
+   *   (4) Iteration over partitions starts
+   * Note that there are other operations happening at the same time as well as
+   * the above operations, but the above operations are the ones which may
+   * interfere with out-of-core operations. The goal of `superstepLock` is to
+   * isolate operations 2, 3, and 4 from the rest of computations and IO
+   * operations. Specifically, increasing the superstep counter (operation 2)
+   * should be exclusive and no IO operation should happen at the same time.
+   * This is due to the fact that prefetching mechanism uses superstep counter
+   * as a mean to identify which data should be read. That being said, 
superstep
+   * counter should be cached in out-of-core engine, and all IO operations and
+   * out-of-core logic should access superstep counter through this cached
+   * value.
+   */
+  private long superstep;
+  /**
+   * Generally, the logic of a graph computations happens in the following 
order
+   * with respect to `startIteration` and `reset` method:
+   * ...
+   * startIteration (for moving edges)
+   * ...
+   * reset (to prepare messages/partitions for superstep 0)
+   * ...
+   * startIteration (superstep 0)
+   * ...
+   * reset (to prepare messages/partitions for superstep 1)
+   * ...
+   *
+   * However, in the unit tests, we usually consider only one superstep 
(usually
+   * INPUT_SUPERSTEP), and we move through partitions multiple times. Out-of-
+   * core mechanism works only if partitions are reset in a proper way. So,
+   * we keep the following flag to reset partitions if necessary.
+   */
+  private boolean resetDone;
 
   /**
    * Constructor
@@ -71,21 +150,45 @@ public abstract class OutOfCoreEngine {
                          CentralizedServiceWorker<?, ?, ?> service) {
     this.service = service;
     this.oocIOCallableFactory = new OutOfCoreIOCallableFactory(conf, this);
-    this.numIOThreads = oocIOCallableFactory.getNumDisks();
+    /* How many disk (i.e. IO threads) do we have? */
+    int numIOThreads = oocIOCallableFactory.getNumDisks();
+    this.ioScheduler = new OutOfCoreIOScheduler(conf, this, numIOThreads);
     this.metaPartitionManager = new MetaPartitionManager(numIOThreads, this);
-    oocIOCallableFactory.createCallable();
+    this.statistics = new OutOfCoreIOStatistics(conf, numIOThreads);
+    int maxPartitionsInMemory =
+        GiraphConstants.MAX_PARTITIONS_IN_MEMORY.get(conf);
+    Class<? extends OutOfCoreOracle> oracleClass =
+        GiraphConstants.OUT_OF_CORE_ORACLE.get(conf);
+    if (maxPartitionsInMemory != 0 &&
+        oracleClass != FixedPartitionsOracle.class) {
+      LOG.warn("OutOfCoreEngine: Max number of partitions in memory is set " +
+          "but the out-of-core oracle used is not tailored for fixed " +
+          "out-of-core policy. Setting the oracle to be 
FixedPartitionsOracle");
+      oracleClass = FixedPartitionsOracle.class;
+    }
+    try {
+      Constructor<?> constructor = oracleClass.getConstructor(
+          ImmutableClassesGiraphConfiguration.class, OutOfCoreEngine.class);
+      this.oracle = (OutOfCoreOracle) constructor.newInstance(conf, this);
+    } catch (NoSuchMethodException | IllegalAccessException |
+        InstantiationException | InvocationTargetException e) {
+      throw new IllegalStateException("OutOfCoreEngine: caught exception " +
+          "while creating the oracle!", e);
+    }
+    this.numComputeThreads = conf.getNumComputeThreads();
+    // At the beginning of the execution, only input threads are processing 
data
+    this.numProcessingThreads = conf.getNumInputSplitsThreads();
+    this.activeThreadsPermit = new AdjustableSemaphore(numProcessingThreads);
+    this.superstep = BspService.INPUT_SUPERSTEP;
+    this.resetDone = false;
+    GiraphMetrics.get().addSuperstepResetObserver(this);
   }
 
   /**
    * Initialize/Start the out-of-core engine.
    */
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
-      "JLM_JSR166_UTILCONCURRENT_MONITORENTER")
   public void initialize() {
-    synchronized (isInitialized) {
-      isInitialized.set(true);
-      isInitialized.notifyAll();
-    }
+    oocIOCallableFactory.createCallable();
   }
 
   /**
@@ -123,20 +226,7 @@ public abstract class OutOfCoreEngine {
    *
    * @return OutOfCoreIOScheduler
    */
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
-      "JLM_JSR166_UTILCONCURRENT_MONITORENTER")
   public OutOfCoreIOScheduler getIOScheduler() {
-    synchronized (isInitialized) {
-      while (!isInitialized.get()) {
-        try {
-          isInitialized.wait();
-        } catch (InterruptedException e) {
-          throw new IllegalStateException("getIOScheduler: " +
-              "InterruptedException while waiting for out-of-core engine to " +
-              "be initialized!");
-        }
-      }
-    }
     return ioScheduler;
   }
 
@@ -146,12 +236,11 @@ public abstract class OutOfCoreEngine {
    * @return MetaPartitionManager
    */
   public MetaPartitionManager getMetaPartitionManager() {
-    checkState(isInitialized.get());
     return metaPartitionManager;
   }
 
   /**
-   * Get a refernce to superstep lock
+   * Get a reference to superstep lock
    *
    * @return read/write lock used for global superstep lock
    */
@@ -160,6 +249,24 @@ public abstract class OutOfCoreEngine {
   }
 
   /**
+   * Get a reference to IO statistics collector
+   *
+   * @return IO statistics collector
+   */
+  public OutOfCoreIOStatistics getIOStatistics() {
+    return statistics;
+  }
+
+  /**
+   * Get a reference to out-of-core oracle
+   *
+   * @return out-of-core oracle
+   */
+  public OutOfCoreOracle getOracle() {
+    return oracle;
+  }
+
+  /**
    * Get the id of the next partition to process in the current iteration over
    * all the partitions. If all partitions are already processed, this method
    * returns null.
@@ -167,20 +274,78 @@ public abstract class OutOfCoreEngine {
    * @return id of a partition to process. 'null' if all partitions are
    *         processed in current iteration over partitions.
    */
-  public abstract Integer getNextPartition();
+  public Integer getNextPartition() {
+    Integer partitionId;
+    synchronized (partitionAvailable) {
+      while ((partitionId = metaPartitionManager.getNextPartition()) == null) {
+        try {
+          if (LOG.isInfoEnabled()) {
+            LOG.info("getNextPartition: waiting until a partition becomes " +
+                "available!");
+          }
+          partitionAvailable.wait(MSEC_TO_WAIT);
+        } catch (InterruptedException e) {
+          throw new IllegalStateException("getNextPartition: caught " +
+              "InterruptedException while waiting to retrieve a partition to " 
+
+              "process");
+        }
+        if (jobFailed) {
+          throw new RuntimeException("Job Failed due to a failure in an " +
+              "out-of-core IO thread!");
+        }
+      }
+      if (partitionId == MetaPartitionManager.NO_PARTITION_TO_PROCESS) {
+        partitionAvailable.notifyAll();
+        partitionId = null;
+      }
+    }
+    return partitionId;
+  }
 
   /**
    * Notify out-of-core engine that processing of a particular partition is 
done
    *
    * @param partitionId id of the partition that its processing is done
    */
-  public abstract void doneProcessingPartition(int partitionId);
+  public void doneProcessingPartition(int partitionId) {
+    metaPartitionManager.setPartitionIsProcessed(partitionId);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("doneProcessingPartition: processing partition " + partitionId +
+          " is done!");
+    }
+  }
 
   /**
-   * Notify out=of-core engine that iteration cycle over all partitions is 
about
+   * Notify out-of-core engine that iteration cycle over all partitions is 
about
    * to begin.
    */
-  public abstract void startIteration();
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
+      "UL_UNRELEASED_LOCK_EXCEPTION_PATH")
+  public void startIteration() {
+    if (!resetDone) {
+      superstepLock.writeLock().lock();
+      metaPartitionManager.resetPartitions();
+      superstepLock.writeLock().unlock();
+    }
+    if (superstep != BspServiceWorker.INPUT_SUPERSTEP &&
+        numProcessingThreads != numComputeThreads) {
+      // This method is only executed by the main thread, and at this point
+      // no other input/compute thread is alive. So, all the permits in
+      // `activeThreadsPermit` is available. However, now that we are changing
+      // the maximum number of active threads, we need to adjust the number
+      // of available permits on `activeThreadsPermit`.
+      activeThreadsPermit.setMaxPermits(activeThreadsPermit.availablePermits() 
*
+          numComputeThreads / numProcessingThreads);
+      numProcessingThreads = numComputeThreads;
+    }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("startIteration: with " +
+          metaPartitionManager.getNumInMemoryPartitions() +
+          " partitions in memory and " +
+          activeThreadsPermit.availablePermits() + " active threads");
+    }
+    resetDone = false;
+  }
 
   /**
    * Retrieve a particular partition. After this method is complete the
@@ -188,14 +353,43 @@ public abstract class OutOfCoreEngine {
    *
    * @param partitionId id of the partition to retrieve
    */
-  public abstract void retrievePartition(int partitionId);
+  public void retrievePartition(int partitionId) {
+    if (metaPartitionManager.isPartitionOnDisk(partitionId)) {
+      ioScheduler.addIOCommand(new LoadPartitionIOCommand(this, partitionId,
+          superstep));
+      synchronized (partitionAvailable) {
+        while (metaPartitionManager.isPartitionOnDisk(partitionId)) {
+          try {
+            if (LOG.isInfoEnabled()) {
+              LOG.info("retrievePartition: waiting until partition " +
+                  partitionId + " becomes available");
+            }
+            partitionAvailable.wait();
+          } catch (InterruptedException e) {
+            throw new IllegalStateException("retrievePartition: caught " +
+                "InterruptedException while waiting to retrieve partition " +
+                partitionId);
+          }
+        }
+      }
+    }
+  }
 
   /**
-   * Notify out-of-core engine that an IO command is competed by an IO thread
+   * Notify out-of-core engine that an IO command is completed by an IO thread
    *
    * @param command the IO command that is completed
    */
-  public abstract void ioCommandCompleted(IOCommand command);
+  public void ioCommandCompleted(IOCommand command) {
+    oracle.commandCompleted(command);
+    if (command instanceof LoadPartitionIOCommand) {
+      // Notifying compute threads who are waiting for a partition to become
+      // available in memory to process.
+      synchronized (partitionAvailable) {
+        partitionAvailable.notifyAll();
+      }
+    }
+  }
 
   /**
    * Set a flag to fail the job.
@@ -203,4 +397,112 @@ public abstract class OutOfCoreEngine {
   public void failTheJob() {
     jobFailed = true;
   }
+
+  /**
+   * Update the fraction of processing threads that should remain active. It is
+   * the responsibility of out-of-core oracle to update the number of active
+   * threads.
+   *
+   * @param fraction the fraction of processing threads to remain active. This
+   *                 number is in range [0, 1]
+   */
+  public void updateActiveThreadsFraction(double fraction) {
+    checkState(fraction >= 0 && fraction <= 1);
+    int numActiveThreads = (int) (numProcessingThreads * fraction);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("updateActiveThreadsFraction: updating the number of active " +
+          "threads to " + numActiveThreads);
+    }
+    activeThreadsPermit.setMaxPermits(numActiveThreads);
+  }
+
+  /**
+   * A processing thread would check in with out-of-core engine every once in a
+   * while to make sure that it can still remain active. It is the
+   * responsibility of the out-of-core oracle to update the number of active
+   * threads in a way that the computation never fails, and yet achieve the
+   * optimal performance it can achieve.
+   */
+  public void activeThreadCheckIn() {
+    activeThreadsPermit.release();
+    try {
+      activeThreadsPermit.acquire();
+    } catch (InterruptedException e) {
+      LOG.error("activeThreadCheckIn: exception while acquiring a permit to " +
+          "remain an active thread");
+      throw new IllegalStateException(e);
+    }
+  }
+
+  /**
+   * Notify the out-of-core engine that a processing (input/compute) thread has
+   * started.
+   */
+  public void processingThreadStart() {
+    try {
+      activeThreadsPermit.acquire();
+    } catch (InterruptedException e) {
+      LOG.error("processingThreadStart: exception while acquiring a permit to" 
+
+          " start the processing thread!");
+      throw new IllegalStateException(e);
+    }
+  }
+
+  /**
+   * Notify the out-of-core engine that a processing (input/compute) thread has
+   * finished.
+   */
+  public void processingThreadFinish() {
+    activeThreadsPermit.release();
+  }
+
+  /**
+   * Reset partitions and messages meta data. Also, reset the cached value of
+   * superstep counter.
+   */
+  public void reset() {
+    metaPartitionManager.resetPartitions();
+    metaPartitionManager.resetMessages();
+    superstep = service.getSuperstep();
+    resetDone = true;
+  }
+
+  /**
+   * @return cached value of the superstep counter
+   */
+  public long getSuperstep() {
+    return superstep;
+  }
+
+  /**
+   * Notify the out-of-core engine that a GC has just been completed
+   *
+   * @param info GC information
+   */
+  public void gcCompleted(GarbageCollectionNotificationInfo info) {
+    oracle.gcCompleted(info);
+  }
+
+  @Override
+  public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
+    superstepMetrics.getGauge(GRAPH_PERCENTAGE_IN_MEMORY, new PercentGauge() {
+      @Override
+      protected double getNumerator() {
+        return metaPartitionManager.getNumInMemoryPartitions();
+      }
+
+      @Override
+      protected double getDenominator() {
+        return metaPartitionManager.getNumPartitions();
+      }
+    });
+  }
+
+  public FlowControl getFlowControl() {
+    return flowControl;
+  }
+
+  public void setFlowControl(FlowControl flowControl) {
+    this.flowControl = flowControl;
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java 
b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java
index 6c6e4ff..962bd6a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java
@@ -18,7 +18,13 @@
 
 package org.apache.giraph.ooc;
 
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Histogram;
+import org.apache.giraph.metrics.GiraphMetrics;
+import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
+import org.apache.giraph.metrics.SuperstepMetricsRegistry;
 import org.apache.giraph.ooc.io.IOCommand;
+import org.apache.giraph.ooc.io.LoadPartitionIOCommand;
 import org.apache.giraph.ooc.io.WaitIOCommand;
 import org.apache.log4j.Logger;
 
@@ -27,7 +33,16 @@ import java.util.concurrent.Callable;
 /**
  * IO threads for out-of-core mechanism.
  */
-public class OutOfCoreIOCallable implements Callable<Void> {
+public class OutOfCoreIOCallable implements Callable<Void>,
+    ResetSuperstepMetricsObserver {
+  /** Name of Metric for number of bytes read from disk */
+  public static final String BYTES_LOAD_FROM_DISK = "ooc-bytes-load";
+  /** Name of Metric for number of bytes written to disk */
+  public static final String BYTES_STORE_TO_DISK = "ooc-bytes-store";
+  /** Name of Metric for size of loads */
+  public static final String HISTOGRAM_LOAD_SIZE = "ooc-load-size-bytes";
+  /** Name of Metric for size of stores */
+  public static final String HISTOGRAM_STORE_SIZE = "ooc-store-size-bytes";
   /** Class logger. */
   private static final Logger LOG = 
Logger.getLogger(OutOfCoreIOCallable.class);
   /** Out-of-core engine */
@@ -36,6 +51,14 @@ public class OutOfCoreIOCallable implements Callable<Void> {
   private final String basePath;
   /** Thread id/Disk id */
   private final int diskId;
+  /** How many bytes of data is read from disk */
+  private Counter bytesReadPerSuperstep;
+  /** How many bytes of data is written to disk */
+  private Counter bytesWrittenPerSuperstep;
+  /** Size of load IO commands */
+  private Histogram histogramLoadSize;
+  /** Size of store IO commands */
+  private Histogram histogramStoreSize;
 
   /**
    * Constructor
@@ -49,10 +72,12 @@ public class OutOfCoreIOCallable implements Callable<Void> {
     this.oocEngine = oocEngine;
     this.basePath = basePath;
     this.diskId = diskId;
+    newSuperstep(GiraphMetrics.get().perSuperstep());
+    GiraphMetrics.get().addSuperstepResetObserver(this);
   }
 
   @Override
-  public Void call() {
+  public Void call() throws Exception {
     while (true) {
       oocEngine.getSuperstepLock().readLock().lock();
       IOCommand command = oocEngine.getIOScheduler().getNextIOCommand(diskId);
@@ -67,17 +92,44 @@ public class OutOfCoreIOCallable implements Callable<Void> {
       if (command instanceof WaitIOCommand) {
         oocEngine.getSuperstepLock().readLock().unlock();
       }
+
+      boolean commandExecuted = false;
+      long duration = 0;
+      long bytes;
       // CHECKSTYLE: stop IllegalCatch
       try {
-        command.execute(basePath);
+        long startTime = System.currentTimeMillis();
+        commandExecuted = command.execute(basePath);
+        duration = System.currentTimeMillis() - startTime;
+        bytes = command.bytesTransferred();
+        if (LOG.isInfoEnabled()) {
+          LOG.info("call: thread " + diskId + "'s command " + command +
+              " completed: bytes= " + bytes + ", duration=" + duration + ", " +
+              "bandwidth=" + String.format("%.2f", (double) bytes / duration *
+              1000 / 1024 / 1024));
+        }
       } catch (Exception e) {
         oocEngine.failTheJob();
-        LOG.info("call: execution of IO command " + command + " failed!");
+        LOG.error("call: execution of IO command " + command + " failed!");
         throw new RuntimeException(e);
       }
       // CHECKSTYLE: resume IllegalCatch
       if (!(command instanceof WaitIOCommand)) {
         oocEngine.getSuperstepLock().readLock().unlock();
+        if (bytes != 0) {
+          if (command instanceof LoadPartitionIOCommand) {
+            bytesReadPerSuperstep.inc(bytes);
+            histogramLoadSize.update(bytes);
+          } else {
+            bytesWrittenPerSuperstep.inc(bytes);
+            histogramStoreSize.update(bytes);
+          }
+        }
+      }
+
+      if (commandExecuted && duration > 0) {
+        oocEngine.getIOStatistics().update(command.getType(),
+            command.bytesTransferred(), duration);
       }
       oocEngine.getIOScheduler().ioCommandCompleted(command);
     }
@@ -86,5 +138,16 @@ public class OutOfCoreIOCallable implements Callable<Void> {
     }
     return null;
   }
+
+  @Override
+  public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
+    bytesReadPerSuperstep = superstepMetrics.getCounter(BYTES_LOAD_FROM_DISK);
+    bytesWrittenPerSuperstep =
+        superstepMetrics.getCounter(BYTES_STORE_TO_DISK);
+    histogramLoadSize =
+        superstepMetrics.getUniformHistogram(HISTOGRAM_LOAD_SIZE);
+    histogramStoreSize =
+        superstepMetrics.getUniformHistogram(HISTOGRAM_STORE_SIZE);
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java 
b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java
index dee632d..6428c30 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java
@@ -22,12 +22,25 @@ import com.google.common.hash.Hashing;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.conf.IntConfOption;
 import org.apache.giraph.ooc.io.IOCommand;
+import org.apache.giraph.ooc.io.LoadPartitionIOCommand;
+import org.apache.giraph.ooc.io.StoreDataBufferIOCommand;
+import org.apache.giraph.ooc.io.StoreIncomingMessageIOCommand;
+import org.apache.giraph.ooc.io.StorePartitionIOCommand;
+import org.apache.giraph.ooc.io.WaitIOCommand;
 import org.apache.log4j.Logger;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
 /**
  * Representation of IO thread scheduler for out-of-core mechanism
  */
-public abstract class OutOfCoreIOScheduler {
+public class OutOfCoreIOScheduler {
   /**
    * If an IO thread does not have any command to do, it waits for certain a
    * period and check back again to see if there exist any command to perform.
@@ -41,11 +54,18 @@ public abstract class OutOfCoreIOScheduler {
   private static final Logger LOG =
       Logger.getLogger(OutOfCoreIOScheduler.class);
   /** Out-of-core engine */
-  protected final OutOfCoreEngine oocEngine;
+  private final OutOfCoreEngine oocEngine;
   /** How much an IO thread should wait if there is no IO command */
-  protected final int waitInterval;
+  private final int waitInterval;
   /** How many disks (i.e. IO threads) do we have? */
   private final int numDisks;
+  /**
+   * Queue of IO commands for loading partitions to memory. Load commands are
+   * urgent and should be done once loading data is a viable IO command.
+   */
+  private final List<Queue<IOCommand>> threadLoadCommandQueue;
+  /** Whether IO threads should terminate */
+  private volatile boolean shouldTerminate;
 
   /**
    * Constructor
@@ -59,6 +79,12 @@ public abstract class OutOfCoreIOScheduler {
     this.oocEngine = oocEngine;
     this.numDisks = numDisks;
     this.waitInterval = OOC_WAIT_INTERVAL.get(conf);
+    threadLoadCommandQueue = new ArrayList<>(numDisks);
+    for (int i = 0; i < numDisks; ++i) {
+      threadLoadCommandQueue.add(
+          new ConcurrentLinkedQueue<IOCommand>());
+    }
+    shouldTerminate = false;
   }
 
   /**
@@ -78,26 +104,170 @@ public abstract class OutOfCoreIOScheduler {
    * @param threadId id of the thread ready to execute the next IO command
    * @return next IO command to be executed by the given thread
    */
-  public abstract IOCommand getNextIOCommand(int threadId);
+  public IOCommand getNextIOCommand(int threadId) {
+    if (shouldTerminate) {
+      return null;
+    }
+    IOCommand command = null;
+    do {
+      if (command != null && LOG.isInfoEnabled()) {
+        LOG.info("getNextIOCommand: command " + command + " was proposed to " +
+            "the oracle, but got denied. Generating another command!");
+      }
+      OutOfCoreOracle.IOAction[] actions =
+          oocEngine.getOracle().getNextIOActions();
+      if (LOG.isInfoEnabled()) {
+        LOG.info("getNextIOCommand: actions are " + Arrays.toString(actions));
+      }
+      // Check whether there are any urgent outstanding load requests
+      if (!threadLoadCommandQueue.get(threadId).isEmpty()) {
+        // Check whether loading a partition is a viable (allowed) action to do
+        boolean canLoad = false;
+        for (OutOfCoreOracle.IOAction action : actions) {
+          if (action == OutOfCoreOracle.IOAction.LOAD_PARTITION ||
+              action == OutOfCoreOracle.IOAction.LOAD_UNPROCESSED_PARTITION ||
+              action == OutOfCoreOracle.IOAction.LOAD_TO_SWAP_PARTITION ||
+              action == OutOfCoreOracle.IOAction.URGENT_LOAD_PARTITION) {
+            canLoad = true;
+            break;
+          }
+        }
+        if (canLoad) {
+          command = threadLoadCommandQueue.get(threadId).poll();
+          checkNotNull(command);
+          if (oocEngine.getOracle().approve(command)) {
+            return command;
+          } else {
+            // Loading is not viable at this moment. We should put the command
+            // back in the load queue and wait until loading becomes viable.
+            threadLoadCommandQueue.get(threadId).offer(command);
+          }
+        }
+      }
+      command = null;
+      for (OutOfCoreOracle.IOAction action : actions) {
+        Integer partitionId;
+        switch (action) {
+        case STORE_MESSAGES_AND_BUFFERS:
+          partitionId = oocEngine.getMetaPartitionManager()
+              .getOffloadPartitionBufferId(threadId);
+          if (partitionId != null) {
+            command = new StoreDataBufferIOCommand(oocEngine, partitionId,
+                StoreDataBufferIOCommand.DataBufferType.PARTITION);
+          } else {
+            partitionId = oocEngine.getMetaPartitionManager()
+                .getOffloadMessageBufferId(threadId);
+            if (partitionId != null) {
+              command = new StoreDataBufferIOCommand(oocEngine, partitionId,
+                  StoreDataBufferIOCommand.DataBufferType.MESSAGE);
+            } else {
+              partitionId = oocEngine.getMetaPartitionManager()
+                  .getOffloadMessageId(threadId);
+              if (partitionId != null) {
+                command = new StoreIncomingMessageIOCommand(oocEngine,
+                    partitionId);
+              }
+            }
+          }
+          break;
+        case STORE_PROCESSED_PARTITION:
+          partitionId = oocEngine.getMetaPartitionManager()
+              .getOffloadPartitionId(threadId);
+          if (partitionId != null &&
+              oocEngine.getMetaPartitionManager()
+                  .isPartitionProcessed(partitionId)) {
+            command = new StorePartitionIOCommand(oocEngine, partitionId);
+          }
+          break;
+        case STORE_PARTITION:
+          partitionId = oocEngine.getMetaPartitionManager()
+              .getOffloadPartitionId(threadId);
+          if (partitionId != null) {
+            command = new StorePartitionIOCommand(oocEngine, partitionId);
+          }
+          break;
+        case LOAD_UNPROCESSED_PARTITION:
+          partitionId = oocEngine.getMetaPartitionManager()
+              .getLoadPartitionId(threadId);
+          if (partitionId != null &&
+              !oocEngine.getMetaPartitionManager()
+                  .isPartitionProcessed(partitionId)) {
+            command = new LoadPartitionIOCommand(oocEngine, partitionId,
+                oocEngine.getSuperstep());
+          }
+          break;
+        case LOAD_TO_SWAP_PARTITION:
+          partitionId = oocEngine.getMetaPartitionManager()
+              .getLoadPartitionId(threadId);
+          if (partitionId != null &&
+              !oocEngine.getMetaPartitionManager()
+                  .isPartitionProcessed(partitionId) &&
+              oocEngine.getMetaPartitionManager().hasProcessedOnMemory()) {
+            command = new LoadPartitionIOCommand(oocEngine, partitionId,
+                oocEngine.getSuperstep());
+          }
+          break;
+        case LOAD_PARTITION:
+          partitionId = oocEngine.getMetaPartitionManager()
+              .getLoadPartitionId(threadId);
+          if (partitionId != null) {
+            if (oocEngine.getMetaPartitionManager()
+                .isPartitionProcessed(partitionId)) {
+              command = new LoadPartitionIOCommand(oocEngine, partitionId,
+                  oocEngine.getSuperstep() + 1);
+            } else {
+              command = new LoadPartitionIOCommand(oocEngine, partitionId,
+                  oocEngine.getSuperstep());
+            }
+          }
+          break;
+        case URGENT_LOAD_PARTITION:
+          // Do nothing
+          break;
+        default:
+          throw new IllegalStateException("getNextIOCommand: the IO action " +
+              "is not defined!");
+        }
+        if (command != null) {
+          break;
+        }
+      }
+      if (command == null) {
+        command = new WaitIOCommand(oocEngine, waitInterval);
+      }
+    } while (!oocEngine.getOracle().approve(command));
+    return command;
+  }
 
   /**
    * Notify IO scheduler that the IO command is completed
    *
    * @param command completed command
    */
-  public abstract void ioCommandCompleted(IOCommand command);
+  public void ioCommandCompleted(IOCommand command) {
+    oocEngine.ioCommandCompleted(command);
+  }
 
   /**
    * Add an IO command to the scheduling queue of the IO scheduler
    *
    * @param ioCommand IO command to add to the scheduler
    */
-  public abstract void addIOCommand(IOCommand ioCommand);
+  public void addIOCommand(IOCommand ioCommand) {
+    int ownerThread = getOwnerThreadId(ioCommand.getPartitionId());
+    if (ioCommand instanceof LoadPartitionIOCommand) {
+      threadLoadCommandQueue.get(ownerThread).offer(ioCommand);
+    } else {
+      throw new IllegalStateException("addIOCommand: IO command type is not " +
+          "supported for addition");
+    }
+  }
 
   /**
    * Shutdown/Terminate the IO scheduler, and notify all IO threads to halt
    */
   public void shutdown() {
+    shouldTerminate = true;
     if (LOG.isInfoEnabled()) {
       LOG.info("shutdown: OutOfCoreIOScheduler shutting down!");
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOStatistics.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOStatistics.java 
b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOStatistics.java
new file mode 100644
index 0000000..a225a4c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOStatistics.java
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc;
+
+import com.google.common.collect.Maps;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.IntConfOption;
+import org.apache.giraph.ooc.io.IOCommand.IOCommandType;
+import org.apache.log4j.Logger;
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Class to collect statistics regarding IO operations done in out-of-core
+ * mechanism.
+ */
+public class OutOfCoreIOStatistics {
+  /**
+   * An estimate of disk bandwidth. This number is only used just at the start
+   * of the computation, and will be updated/replaced later on once a few disk
+   * operations happen.
+   */
+  public static final IntConfOption DISK_BANDWIDTH_ESTIMATE =
+      new IntConfOption("giraph.diskBandwidthEstimate", 125,
+          "An estimate of disk bandwidth (MB/s). This number is used just at " 
+
+              "the beginning of the computation, and it will be " +
+              "updated/replaced once a few disk operations happen.");
+  /**
+   * How many recent IO operations should we keep track of? Any report given 
out
+   * of this statistics collector is only based on most recent IO operations.
+   */
+  public static final IntConfOption IO_COMMAND_HISTORY_SIZE =
+      new IntConfOption("giraph.ioCommandHistorySize", 50,
+          "Number of most recent IO operations to consider for reporting the" +
+              "statistics.");
+
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(OutOfCoreIOStatistics.class);
+  /** Estimate of disk bandwidth (bytes/second) */
+  private final AtomicLong diskBandwidthEstimate;
+  /** Cached value for IO_COMMAND_HISTORY_SIZE */
+  private final int maxHistorySize;
+  /**
+   * Coefficient/Weight of the most recent IO operation toward the disk
+   * bandwidth estimate. Basically if the disk bandwidth estimate if d, and the
+   * latest IO command happened at the rate of r, the new estimate of disk
+   * bandwidth is calculated as:
+   * d_new = updateCoefficient * r + (1 - updateCoefficient) * d
+   */
+  private final double updateCoefficient;
+  /** Queue of all recent commands */
+  private final Queue<StatisticsEntry> commandHistory;
+  /**
+   * Command statistics for each type of IO command. This is the statistics of
+   * the recent commands in the history we keep track of (with 'maxHistorySize'
+   * command in the history).
+   */
+  private final Map<IOCommandType, StatisticsEntry> aggregateStats;
+  /** How many IO command completed? */
+  private int numUpdates = 0;
+
+  /**
+   * Constructor
+   *
+   * @param conf configuration
+   * @param numIOThreads number of disks/IO threads
+   */
+  public OutOfCoreIOStatistics(ImmutableClassesGiraphConfiguration conf,
+                               int numIOThreads) {
+    this.diskBandwidthEstimate =
+        new AtomicLong(DISK_BANDWIDTH_ESTIMATE.get(conf) *
+            (long) GiraphConstants.ONE_MB);
+    this.maxHistorySize = IO_COMMAND_HISTORY_SIZE.get(conf);
+    this.updateCoefficient = 1.0 / maxHistorySize;
+    // Adding more entry to the capacity of the queue to have a wiggle room
+    // if all IO threads are adding/removing entries from the queue
+    this.commandHistory =
+        new ArrayBlockingQueue<>(maxHistorySize + numIOThreads);
+    this.aggregateStats = Maps.newConcurrentMap();
+    for (IOCommandType type : IOCommandType.values()) {
+      aggregateStats.put(type, new StatisticsEntry(type, 0, 0, 0));
+    }
+  }
+
+  /**
+   * Update statistics with the last IO command that is executed.
+   *
+   * @param type type of the IO command that is executed
+   * @param bytesTransferred number of bytes transferred in the last IO command
+   * @param duration duration it took for the last IO command to complete
+   *                 (milliseconds)
+   */
+  public void update(IOCommandType type, long bytesTransferred,
+                     long duration) {
+    StatisticsEntry entry = aggregateStats.get(type);
+    synchronized (entry) {
+      entry.setOccurrence(entry.getOccurrence() + 1);
+      entry.setDuration(duration + entry.getDuration());
+      entry.setBytesTransferred(bytesTransferred + 
entry.getBytesTransferred());
+    }
+    commandHistory.offer(
+        new StatisticsEntry(type, bytesTransferred, duration, 0));
+    if (type != IOCommandType.WAIT) {
+      // If the current estimate is 'd', the new rate is 'r', and the size of
+      // history is 'n', we can simply model all the past command's rate as:
+      // d, d, d, ..., d, r
+      // where 'd' happens for 'n-1' times. Hence the new estimate of the
+      // bandwidth would be:
+      // d_new = (d * (n-1) + r) / n = (1-1/n)*d + 1/n*r
+      // where updateCoefficient = 1/n
+      diskBandwidthEstimate.set((long)
+          (updateCoefficient * (bytesTransferred / duration * 1000) +
+              (1 - updateCoefficient) * diskBandwidthEstimate.get()));
+    }
+    if (commandHistory.size() > maxHistorySize) {
+      StatisticsEntry removedEntry = commandHistory.poll();
+      entry = aggregateStats.get(removedEntry.getType());
+      synchronized (entry) {
+        entry.setOccurrence(entry.getOccurrence() - 1);
+        entry.setDuration(entry.getDuration() - removedEntry.getDuration());
+        entry.setBytesTransferred(
+            entry.getBytesTransferred() - removedEntry.getBytesTransferred());
+      }
+    }
+    numUpdates++;
+    // Outputting log every so many commands
+    if (numUpdates % 10 == 0) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info(this);
+      }
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuffer sb = new StringBuffer();
+    long waitTime = 0;
+    long loadTime = 0;
+    long bytesRead = 0;
+    long storeTime = 0;
+    long bytesWritten = 0;
+    for (Map.Entry<IOCommandType, StatisticsEntry> entry :
+        aggregateStats.entrySet()) {
+      synchronized (entry.getValue()) {
+        sb.append(entry.getKey() + ": " + entry.getValue() + ", ");
+        if (entry.getKey() == IOCommandType.WAIT) {
+          waitTime += entry.getValue().getDuration();
+        } else if (entry.getKey() == IOCommandType.LOAD_PARTITION) {
+          loadTime += entry.getValue().getDuration();
+          bytesRead += entry.getValue().getBytesTransferred();
+        } else {
+          storeTime += entry.getValue().getDuration();
+          bytesWritten += entry.getValue().getBytesTransferred();
+        }
+      }
+    }
+    sb.append(String.format("Average STORE: %.2f MB/s, ",
+        (double) bytesWritten / storeTime * 1000 / 1024 / 1024));
+    sb.append(String.format("DATA_INJECTION: %.2f MB/s, ",
+        (double) (bytesRead - bytesWritten) /
+            (waitTime + loadTime + storeTime) * 1000 / 1024 / 1024));
+    sb.append(String.format("DISK_BANDWIDTH: %.2f MB/s",
+        (double) diskBandwidthEstimate.get() / 1024 / 1024));
+
+    return sb.toString();
+  }
+
+  /**
+   * @return most recent estimate of the disk bandwidth
+   */
+  public long getDiskBandwidth() {
+    return diskBandwidthEstimate.get();
+  }
+
+  /**
+   * Get aggregate statistics of a given command type in the command history
+   *
+   * @param type type of the command to get the statistics for
+   * @return aggregate statistics for the given command type
+   */
+  public BytesDuration getCommandTypeStats(IOCommandType type) {
+    StatisticsEntry entry = aggregateStats.get(type);
+    synchronized (entry) {
+      return new BytesDuration(entry.getBytesTransferred(), 
entry.getDuration(),
+          entry.getOccurrence());
+    }
+  }
+
+  /**
+   * Helper class to return results of statistics collector for a certain
+   * command type
+   */
+  public static class BytesDuration {
+    /** Number of bytes transferred in a few commands of the same type */
+    private long bytes;
+    /** Duration of it took to execute a few commands of the same type */
+    private long duration;
+    /** Number of commands executed of the same type */
+    private int occurrence;
+
+    /**
+     * Constructor
+     * @param bytes number of bytes transferred
+     * @param duration duration it took to execute commands
+     * @param occurrence number of commands
+     */
+    BytesDuration(long bytes, long duration, int occurrence) {
+      this.bytes = bytes;
+      this.duration = duration;
+      this.occurrence = occurrence;
+    }
+
+    /**
+     * @return number of bytes transferred for the same command type
+     */
+    public long getBytes() {
+      return bytes;
+    }
+
+    /**
+     * @return duration it took to execute a few commands of the same type
+     */
+    public long getDuration() {
+      return duration;
+    }
+
+    /**
+     * @return number of commands that are executed of the same type
+     */
+    public int getOccurrence() {
+      return occurrence;
+    }
+  }
+
+  /**
+   * Helper class to keep statistics for a certain command type
+   */
+  private static class StatisticsEntry {
+    /** Type of the command */
+    private IOCommandType type;
+    /**
+     * Aggregate number of bytes transferred executing the particular command
+     * type in the history we keep
+     */
+    private long bytesTransferred;
+    /**
+     * Aggregate duration it took executing the particular command type in the
+     * history we keep
+     */
+    private long duration;
+    /**
+     * Number of occurrences of the particular command type in the history we
+     * keep
+     */
+    private int occurrence;
+
+    /**
+     * Constructor
+     *
+     * @param type type of the command
+     * @param bytesTransferred aggregate number of bytes transferred
+     * @param duration aggregate execution time
+     * @param occurrence number of occurrences of the particular command type
+     */
+    public StatisticsEntry(IOCommandType type, long bytesTransferred,
+                           long duration, int occurrence) {
+      this.type = type;
+      this.bytesTransferred = bytesTransferred;
+      this.duration = duration;
+      this.occurrence = occurrence;
+    }
+
+    /**
+     * @return type of the command
+     */
+    public IOCommandType getType() {
+      return type;
+    }
+
+    /**
+     * @return aggregate number of bytes transferred in the particular command
+     *         type
+     */
+    public long getBytesTransferred() {
+      return bytesTransferred;
+    }
+
+    /**
+     * Update the aggregate number of bytes transferred
+     *
+     * @param bytesTransferred aggregate number of bytes
+     */
+    public void setBytesTransferred(long bytesTransferred) {
+      this.bytesTransferred = bytesTransferred;
+    }
+
+    /**
+     * @return aggregate duration it took to execute the particular command 
type
+     */
+    public long getDuration() {
+      return duration;
+    }
+
+    /**
+     * Update the aggregate duration
+     *
+     * @param duration aggregate duration
+     */
+    public void setDuration(long duration) {
+      this.duration = duration;
+    }
+
+    /**
+     * @return number of occurrences of the particular command type
+     */
+    public int getOccurrence() {
+      return occurrence;
+    }
+
+    /**
+     * Update the number of occurrences of the particular command type
+     *
+     * @param occurrence number of occurrences
+     */
+    public void setOccurrence(int occurrence) {
+      this.occurrence = occurrence;
+    }
+
+    @Override
+    public String toString() {
+      if (type == IOCommandType.WAIT) {
+        return String.format("%.2f sec", duration / 1000.0);
+      } else {
+        return String.format("%.2f MB/s",
+            (double) bytesTransferred / duration * 1000 / 1024 / 2014);
+      }
+    }
+  }
+}

Reply via email to