http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreOracle.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreOracle.java 
b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreOracle.java
new file mode 100644
index 0000000..fa8e6bd
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreOracle.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc;
+
+import com.sun.management.GarbageCollectionNotificationInfo;
+import org.apache.giraph.ooc.io.IOCommand;
+
+/**
+ * Interface for any out-of-core oracle. An out-of-core oracle is the brain of
+ * the out-of-core mechanism, determining/deciding on out-of-core actions (load
+ * or store) that should happen.
+ */
+public interface OutOfCoreOracle {
+  /**
+   * Different types of IO actions that can potentially lead to a more desired
+   * state of computation for out-of-core mechanism. These actions are issued
+   * based on the status of the memory (memory pressure, rate of data transfer
+   * to memory, etc.)
+   */
+  enum IOAction {
+    /**
+     * Either of:
+     *    - storing incoming messages of any partition currently on disk, or
+     *    - storing incoming messages' raw data buffer of any partition
+     *      currently on disk, or
+     *    - storing partitions' raw data buffer for those partitions that are
+     *      currently on disk.
+     */
+    STORE_MESSAGES_AND_BUFFERS,
+    /**
+     * Storing a partition that is *processed* in the current iteration cycle.
+     * This action is also known as "soft store"
+     */
+    STORE_PROCESSED_PARTITION,
+    /**
+     * Storing a partition from memory on disk, prioritizing to *processed*
+     * partitions on memory. However, if there is no *processed* partition,
+     * store should happen at any cost, even if an *unprocessed* partition has
+     * to be stored. This action is also know as "hard store".
+     */
+    STORE_PARTITION,
+    /**
+     * Loading an *unprocessed* partition from disk to memory, only if there 
are
+     * *processed* partitions in memory. This action basically initiates a swap
+     * operation.
+     */
+    LOAD_TO_SWAP_PARTITION,
+    /**
+     * Loading an *unprocessed* partition from disk to memory. This action is
+     * also known as "soft load".
+     */
+    LOAD_UNPROCESSED_PARTITION,
+    /**
+     * Loading a partition (prioritizing *unprocessed* over *processed*) from
+     * disk to memory. Loading a *processed* partition to memory is a prefetch
+     * of that partition to be processed in the next superstep. This action is
+     * also known as "hard load".
+     */
+    LOAD_PARTITION,
+    /**
+     * Loading a partition regardless of the memory situation. An out-of-core
+     * mechanism may use this action to signal IO threads that it is allowed to
+     * load a partition that is specifically requested.
+     */
+    URGENT_LOAD_PARTITION
+  }
+
+  /**
+   * Get the next set of viable IO actions to help bring memory to a more
+   * desired state.
+   *
+   * @return an array of viable IO actions, sorted from highest priority to
+   *         lowest priority
+   */
+  IOAction[] getNextIOActions();
+
+  /**
+   * Whether a command is appropriate to bring the memory to a more desired
+   * state. A command is not executed unless it is approved by the oracle. This
+   * method is specially important where there are multiple IO threads
+   * performing IO operations for the out-of-core mechanism. The approval
+   * becomes significantly important to prevent all IO threads from performing
+   * identical command type, if that is a necessity. For instance, execution of
+   * a particular command type by only one thread may bring the memory to a
+   * desired state, and the rest of IO threads may perform other types of
+   * commands.
+   *
+   * @param command the IO command that is about to execute
+   * @return 'true' if the command is approved for execution. 'false' if the
+   *         command should not be executed
+   */
+  boolean approve(IOCommand command);
+
+  /**
+   * Notification of command completion. Oracle may update its status and 
commit
+   * the changes a command may cause.
+   *
+   * @param command the IO command that is completed
+   */
+  void commandCompleted(IOCommand command);
+
+  /**
+   * Notification of GC completion. Oracle may take certain decisions based on
+   * GC information (such as amount of time it took, memory it reclaimed, etc.)
+   *
+   * @param gcInfo GC information
+   */
+  void gcCompleted(GarbageCollectionNotificationInfo gcInfo);
+
+  /**
+   * Shut down the out-of-core oracle. Necessary specifically for cases where
+   * out-of-core oracle is using additional monitoring threads.
+   */
+  void shutdown();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/SimpleGCMonitoringOracle.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/SimpleGCMonitoringOracle.java 
b/giraph-core/src/main/java/org/apache/giraph/ooc/SimpleGCMonitoringOracle.java
new file mode 100644
index 0000000..0dfc9de
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/ooc/SimpleGCMonitoringOracle.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc;
+
+import com.google.common.collect.Maps;
+import com.sun.management.GarbageCollectionNotificationInfo;
+import org.apache.giraph.conf.FloatConfOption;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.ooc.io.IOCommand;
+import org.apache.giraph.ooc.io.LoadPartitionIOCommand;
+import org.apache.giraph.ooc.io.WaitIOCommand;
+import org.apache.log4j.Logger;
+
+import java.lang.management.MemoryUsage;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Out-of-core oracle to adaptively control data kept in memory, with the goal
+ * of keeping the memory state constantly at a desired state. This oracle
+ * monitors GC behavior to keep track of memory pressure.
+ *
+ * After each GC is done, this oracle retrieve statistics about the memory
+ * pressure (memory used, max memory, and how far away memory is compared to a
+ * max optimal pressure). Based on the the past 2 recent memory statistics,
+ * the oracle predicts the status of the memory, and sets the rate of 
load/store
+ * of data from/to disk. If the rate of loading data from disk is 'l', and the
+ * rate of storing data to disk is 's', the rate of data injection to memory
+ * from disk can be denoted as 'l-s'. This oracle determines what 'l-s' should
+ * be based on the prediction of memory status.
+ *
+ * Assume that based on the previous GC call the memory usage at time t_0 is
+ * m_0, and based on the most recent GC call the memory usage at time t_1 is
+ * m_1. So, the rate of memory increase is alpha = (m_1 - m_0) / (t_1 - t_0).
+ * Assume that the ideal memory pressure happens when the memory usage is
+ * m_ideal. So, at time 't_2 = t_1 + (t_1 - t_0)', we want m_ideal. That means
+ * the ideal rate would be beta = (m_ideal - m_1) / (t_2 - t_1). If the date
+ * injection rate to memory so far was i, the new injection rate should be:
+ * i_new = i - (alpha - beta)
+ */
+public class SimpleGCMonitoringOracle implements OutOfCoreOracle {
+  /**
+   * The optimal memory pressure at which GC behavior is close to ideal. This
+   * fraction may be dependant on the GC strategy used for running a job, but
+   * generally should not be dependent on the graph processing application.
+   */
+  public static final FloatConfOption OPTIMAL_MEMORY_PRESSURE =
+      new FloatConfOption("giraph.optimalMemoryPressure", 0.8f,
+          "The memory pressure (fraction of used memory) at which the job " +
+              "shows the optimal GC behavior. This fraction may be dependent " 
+
+              "on the GC strategy used in running the job.");
+
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(SimpleGCMonitoringOracle.class);
+  /** Cached value for OPTIMAL_MEMORY_PRESSURE */
+  private final float optimalMemoryPressure;
+  /** Out-of-core engine */
+  private final OutOfCoreEngine oocEngine;
+  /** Status of memory from the last GC call */
+  private GCObservation lastGCObservation;
+  /** Desired rate of data injection to memory */
+  private final AtomicLong desiredDiskToMemoryDataRate =
+      new AtomicLong(0);
+  /** Number of on the fly (outstanding) IO commands for each command type */
+  private final Map<IOCommand.IOCommandType, AtomicInteger> commandOccurrences 
=
+      Maps.newConcurrentMap();
+
+  /**
+   * Constructor
+   *
+   * @param conf configuration
+   * @param oocEngine out-of-core engine
+   */
+  public SimpleGCMonitoringOracle(ImmutableClassesGiraphConfiguration conf,
+                                  OutOfCoreEngine oocEngine) {
+    this.optimalMemoryPressure = OPTIMAL_MEMORY_PRESSURE.get(conf);
+    this.oocEngine = oocEngine;
+    this.lastGCObservation = new GCObservation(-1, 0, 0);
+    for (IOCommand.IOCommandType type : IOCommand.IOCommandType.values()) {
+      commandOccurrences.put(type, new AtomicInteger(0));
+    }
+  }
+
+  @Override
+  public synchronized void gcCompleted(GarbageCollectionNotificationInfo
+                                             gcInfo) {
+    long time = System.currentTimeMillis();
+    Map<String, MemoryUsage> memAfter = gcInfo.getGcInfo()
+        .getMemoryUsageAfterGc();
+    long usedMemory = 0;
+    long maxMemory = 0;
+    for (MemoryUsage memDetail : memAfter.values()) {
+      usedMemory += memDetail.getUsed();
+      maxMemory += memDetail.getMax();
+    }
+    GCObservation observation = new GCObservation(time, usedMemory, maxMemory);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("gcCompleted: GC completed with: " + observation);
+    }
+    // Whether this is not the first GC call in the application
+    if (lastGCObservation.isValid()) {
+      long deltaDataRate =
+          lastGCObservation.getDesiredDeltaDataRate(observation);
+      long diskBandwidthEstimate =
+          oocEngine.getIOStatistics().getDiskBandwidth();
+      // Update the desired data injection rate to memory. The data injection
+      // rate cannot be less than -disk_bandwidth (the extreme case happens if
+      // we only do 'store'), and cannot be more than disk_bandwidth (the
+      // extreme case happens if we only do 'load').
+      long dataInjectionRate = desiredDiskToMemoryDataRate.get();
+      desiredDiskToMemoryDataRate.set(Math.max(
+          Math.min(desiredDiskToMemoryDataRate.get() - deltaDataRate,
+              diskBandwidthEstimate), -diskBandwidthEstimate));
+      if (LOG.isInfoEnabled()) {
+        LOG.info("gcCompleted: changing data injection rate from " +
+            String.format("%.2f", dataInjectionRate / 1024.0 / 1024.0) +
+            " to " + String.format("%.2f", desiredDiskToMemoryDataRate.get() /
+            1024.0 / 1024.0));
+      }
+    }
+    lastGCObservation = observation;
+  }
+
+  /**
+   * Get the current data injection rate to memory based on the commands ran
+   * in the history (retrieved from statistics collector), and outstanding
+   * commands issued by the IO scheduler.
+   *
+   * @return the current data injection rate to memory
+   */
+  private long getCurrentDataInjectionRate() {
+    long effectiveBytesTransferred = 0;
+    long effectiveDuration = 0;
+    for (IOCommand.IOCommandType type : IOCommand.IOCommandType.values()) {
+      OutOfCoreIOStatistics.BytesDuration stats =
+          oocEngine.getIOStatistics().getCommandTypeStats(type);
+      int occurrence = commandOccurrences.get(type).get();
+      long typeBytesTransferred = stats.getBytes();
+      long typeDuration = stats.getDuration();
+      // If there is an outstanding command, we still do not know how many 
bytes
+      // it will transfer, and how long it will take. So, we guesstimate these
+      // numbers based on other similar commands happened in the history. We
+      // simply take the average number of bytes transferred for the particular
+      // command, and we take average duration for the particular command. We
+      // should multiply these numbers by the number of outstanding commands of
+      // this particular command type.
+      if (stats.getOccurrence() != 0) {
+        typeBytesTransferred += stats.getBytes() / stats.getOccurrence() *
+            occurrence;
+        typeDuration += stats.getDuration() / stats.getOccurrence() *
+            occurrence;
+      }
+      if (type == IOCommand.IOCommandType.LOAD_PARTITION) {
+        effectiveBytesTransferred += typeBytesTransferred;
+      } else {
+        // Store (data going out of memory), or wait (no data transferred)
+        effectiveBytesTransferred -= typeBytesTransferred;
+      }
+      effectiveDuration += typeDuration;
+    }
+    if (effectiveDuration == 0) {
+      return 0;
+    } else {
+      return effectiveBytesTransferred / effectiveDuration;
+    }
+  }
+
+  @Override
+  public IOAction[] getNextIOActions() {
+    long error = (long) (oocEngine.getIOStatistics().getDiskBandwidth() * 
0.05);
+    long desiredRate = desiredDiskToMemoryDataRate.get();
+    long currentRate = getCurrentDataInjectionRate();
+    if (desiredRate > error) {
+      // 'l-s' is positive, we should do more load than store.
+      if (currentRate > desiredRate + error) {
+        // We should decrease 'l-s'. This can be done either by increasing 's'
+        // or issuing wait command. We prioritize wait over hard store.
+        return new IOAction[]{
+          IOAction.STORE_MESSAGES_AND_BUFFERS,
+          IOAction.STORE_PROCESSED_PARTITION};
+      } else if (currentRate < desiredRate - error) {
+        // We should increase 'l-s'. We can simply load partitions/data.
+        return new IOAction[]{IOAction.LOAD_PARTITION};
+      } else {
+        // We are in a proper state and we should keep up with the rate. We can
+        // either soft store data or load data (hard load, since we desired 
rate
+        // is positive).
+        return new IOAction[]{
+          IOAction.STORE_MESSAGES_AND_BUFFERS,
+          IOAction.STORE_PROCESSED_PARTITION,
+          IOAction.LOAD_PARTITION};
+      }
+    } else if (desiredRate < -error) {
+      // 'l-s' is negative, we should do more store than load.
+      if (currentRate < desiredRate - error) {
+        // We should increase 'l-s', but we should be cautious. We only do soft
+        // load, or wait.
+        return new IOAction[]{IOAction.LOAD_UNPROCESSED_PARTITION};
+      } else if (currentRate > desiredRate + error) {
+        // We should reduce 'l-s', we do hard store.
+        return new IOAction[]{
+          IOAction.STORE_MESSAGES_AND_BUFFERS,
+          IOAction.STORE_PARTITION};
+      } else {
+        // We should keep up with the rate. We can either soft store data, or
+        // soft load data.
+        return new IOAction[]{
+          IOAction.STORE_MESSAGES_AND_BUFFERS,
+          IOAction.STORE_PROCESSED_PARTITION,
+          IOAction.LOAD_UNPROCESSED_PARTITION};
+      }
+    } else {
+      // 'l-s' is almost zero. If current rate is over the desired rate, we do
+      // soft store. If the current rate is below the desired rate, we do soft
+      // load.
+      if (currentRate > desiredRate + error) {
+        return new IOAction[]{
+          IOAction.STORE_MESSAGES_AND_BUFFERS,
+          IOAction.STORE_PROCESSED_PARTITION};
+      } else if (currentRate < desiredRate - error) {
+        return new IOAction[]{IOAction.LOAD_UNPROCESSED_PARTITION};
+      } else {
+        return new IOAction[]{
+          IOAction.STORE_MESSAGES_AND_BUFFERS,
+          IOAction.STORE_PROCESSED_PARTITION,
+          IOAction.LOAD_UNPROCESSED_PARTITION};
+      }
+    }
+  }
+
+  @Override
+  public synchronized boolean approve(IOCommand command) {
+    long error = (long) (oocEngine.getIOStatistics().getDiskBandwidth() * 
0.05);
+    long desiredRate = desiredDiskToMemoryDataRate.get();
+    long currentRate = getCurrentDataInjectionRate();
+    // The command is denied iff the current rate is above the desired rate and
+    // we are doing load (instead of store), or the current rate is below the
+    // desired rate and we are doing store (instead of loading).
+    if (currentRate > desiredRate + error &&
+        command instanceof LoadPartitionIOCommand) {
+      return false;
+    }
+    if (currentRate < desiredRate - error &&
+        !(command instanceof LoadPartitionIOCommand) &&
+        !(command instanceof WaitIOCommand)) {
+      return false;
+    }
+    commandOccurrences.get(command.getType()).getAndIncrement();
+    return true;
+  }
+
+  @Override
+  public void commandCompleted(IOCommand command) {
+    commandOccurrences.get(command.getType()).getAndDecrement();
+  }
+
+  @Override
+  public void shutdown() { }
+
+  /** Helper class to record memory status after GC calls */
+  private class GCObservation {
+    /** The time at which the GC happened (in milliseconds) */
+    private long time;
+    /** Amount of memory used after the GC call */
+    private long usedMemory;
+    /** Maximum amounts of memory reported by GC listener */
+    private long maxMemory;
+
+    /**
+     * Constructor
+     *
+     * @param time time of GC
+     * @param usedMemory amount of used memory after GC
+     * @param maxMemory amount of all available memory based on GC observation
+     */
+    public GCObservation(long time, long usedMemory, long maxMemory) {
+      this.time = time;
+      this.usedMemory = usedMemory;
+      this.maxMemory = maxMemory;
+    }
+
+    /**
+     * Is this a valid observation?
+     *
+     * @return true iff it is a valid observation
+     */
+    public boolean isValid() {
+      return time > 0;
+    }
+
+    /**
+     * Considering a new observation of memory status after the most recent GC,
+     * what is the desired rate for data injection to memory.
+     *
+     * @param newObservation the most recent GC observation
+     * @return desired rate of data injection to memory
+     */
+    public long getDesiredDeltaDataRate(GCObservation newObservation) {
+      long newUsedMemory = newObservation.usedMemory;
+      long newMaxMemory = newObservation.maxMemory;
+      long lastUsedMemory = usedMemory;
+      long lastMaxMemory = maxMemory;
+      // Scale the memory status of two GC observation to be the same
+      long scaledMaxMemory = Math.min(lastMaxMemory, newMaxMemory);
+      newUsedMemory =
+          (long) (((double) scaledMaxMemory / newMaxMemory) * newUsedMemory);
+      lastUsedMemory =
+          (long) (((double) scaledMaxMemory / lastMaxMemory) * lastUsedMemory);
+      long desiredUsedMemory = (long) (optimalMemoryPressure * 
scaledMaxMemory);
+      if (LOG.isInfoEnabled()) {
+        LOG.info("getDesiredDeltaDataRate: " + String.format("previous usage " 
+
+            "= %.2f MB, ", lastUsedMemory / 1024.0 / 1024.0) + String.format(
+            "current usage = %.2f MB, ", newUsedMemory / 1024.0 / 1024.0) +
+            String.format("ideal usage = %.2f MB", desiredUsedMemory / 1024.0 /
+                1024.0));
+      }
+      long interval = newObservation.time - time;
+      if (interval == 0) {
+        interval = 1;
+        LOG.warn("getDesiredDeltaRate: two GC happened almost at the same " +
+            "time!");
+      }
+      long currentDataRate = (long) ((double) (newUsedMemory -
+          lastUsedMemory) / interval * 1000);
+      long desiredDataRate = (long) ((double) (desiredUsedMemory -
+          newUsedMemory) / interval * 1000);
+      return currentDataRate - desiredDataRate;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("(usedMemory: %.2f MB, maxMemory: %.2f MB at " +
+          "time: %d ms)", usedMemory / 1024.0 / 1024.0,
+          maxMemory / 1024.0 / 1024.0, time);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/ThresholdBasedOracle.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/ThresholdBasedOracle.java 
b/giraph-core/src/main/java/org/apache/giraph/ooc/ThresholdBasedOracle.java
new file mode 100644
index 0000000..3e05dce
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/ThresholdBasedOracle.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc;
+
+import com.sun.management.GarbageCollectionNotificationInfo;
+import org.apache.giraph.comm.flow_control.CreditBasedFlowControl;
+import org.apache.giraph.comm.flow_control.FlowControl;
+import org.apache.giraph.comm.netty.NettyClient;
+import org.apache.giraph.conf.FloatConfOption;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.LongConfOption;
+import org.apache.giraph.ooc.io.IOCommand;
+import org.apache.giraph.utils.CallableFactory;
+import org.apache.giraph.utils.LogStacktraceCallable;
+import org.apache.giraph.utils.MemoryUtils;
+import org.apache.giraph.utils.ThreadUtils;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Out-of-core oracle to adaptively control data kept in memory, with the goal
+ * of keeping the memory usage at a desired state. Out-of-core policy in this
+ * oracle is based on several user-defined thresholds. Also, this oracle spawns
+ * a thread to periodically check the memory usage. This thread would issue
+ * manual GC calls if JVM fails to call major/full GC for a while and the 
amount
+ * of used memory is about to cause high-memory pressure. This oracle, also,
+ * monitors GC activities. The monitoring mechanism looks for major/full GC
+ * calls, and updates out-of-core decisions based on the amount of available
+ * memory after such GCs. There are three out-of-core decisions:
+ *  - Which IO operations should be done (load/offload of partitions and
+ *    messages)
+ *  - What the incoming messages rate should be (updating credits announced by
+ *    this worker in credit-based flow-control mechanism)
+ *  - How many processing threads should remain active (tethering rate of
+ *    data generation)
+ *
+ * The following table shows the relationship of these decisions and
+ * used-defined thresholds.
+ * --------------------------------------------------------------
+ * Memory Pressure     |  Manual |   IO   | Credit   | Active   |
+ * (memory usage)      |   GC?   | Action |          | Threads  |
+ * --------------------------------------------------------------
+ *                     |  Yes    | hard   |  0       |  0       |
+ *                     |         | store  |          |          |
+ * failPressure -------------------------------------------------
+ *                     |  Yes    | hard   |  0       | fraction |
+ *                     |         | store  |          |          |
+ * emergencyPressure --------------------------------------------
+ *                     |  Yes    | hard   | fraction |  max     |
+ *                     |         | store  |          |          |
+ * highPressure -------------------------------------------------
+ *                     |  No     | soft   | fraction |  max     |
+ *                     |         | store  |          |          |
+ * optimalPressure ----------------------------------------------
+ *                     |  No     | soft   |  max     |  max     |
+ *                     |         | load   |          |          |
+ * lowPressure --------------------------------------------------
+ *                     |  No     | hard   |  max     |  max     |
+ *                     |         | load   |          |          |
+ * --------------------------------------------------------------
+ *
+ */
+public class ThresholdBasedOracle implements OutOfCoreOracle {
+  /** The memory pressure at/above which the job would fail */
+  public static final FloatConfOption FAIL_MEMORY_PRESSURE =
+      new FloatConfOption("giraph.memory.failPressure", 0.975f,
+          "The memory pressure (fraction of used memory) at/above which the " +
+              "job would fail.");
+  /**
+   * The memory pressure at which the job is cloe to fail, even though we were
+   * using maximal disk bandwidth and minimal network rate. We should reduce
+   * job processing rate.
+   */
+  public static final FloatConfOption EMERGENCY_MEMORY_PRESSURE =
+      new FloatConfOption("giraph.memory.emergencyPressure", 0.925f,
+          "The memory pressure (fraction of used memory) at which the job " +
+              "is close to fail, hence we should reduce its processing rate " +
+              "as much as possible.");
+  /** The memory pressure at which the job is suffering from GC overhead. */
+  public static final FloatConfOption HIGH_MEMORY_PRESSURE =
+      new FloatConfOption("giraph.memory.highPressure", 0.875f,
+          "The memory pressure (fraction of used memory) at which the job " +
+              "is suffering from GC overhead.");
+  /**
+   * The memory pressure at which we expect GC to perform optimally for a
+   * memory intensive job.
+   */
+  public static final FloatConfOption OPTIMAL_MEMORY_PRESSURE =
+      new FloatConfOption("giraph.memory.optimalPressure", 0.8f,
+          "The memory pressure (fraction of used memory) at which a " +
+              "memory-intensive job shows the optimal GC behavior.");
+  /**
+   * The memory pressure at/below which the job can use more memory without
+   * suffering from GC overhead.
+   */
+  public static final FloatConfOption LOW_MEMORY_PRESSURE =
+      new FloatConfOption("giraph.memory.lowPressure", 0.7f,
+          "The memory pressure (fraction of used memory) at/below which the " +
+              "job can use more memory without suffering the performance.");
+  /** The interval at which memory observer thread wakes up. */
+  public static final LongConfOption CHECK_MEMORY_INTERVAL =
+      new LongConfOption("giraph.checkMemoryInterval", 2500,
+          "The interval/period where memory observer thread wakes up and " +
+              "monitors memory footprint (in milliseconds)");
+  /**
+   * Memory observer thread would manually call GC if major/full GC has not
+   * been called for a while. The period where we expect GC to be happened in
+   * past is specified in this parameter
+   */
+  public static final LongConfOption LAST_GC_CALL_INTERVAL =
+      new LongConfOption("giraph.lastGcCallInterval", 10 * 1000,
+          "How long after last major/full GC should we call manual GC?");
+
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(ThresholdBasedOracle.class);
+  /** Cached value for FAIL_MEMORY_PRESSURE */
+  private final float failMemoryPressure;
+  /** Cached value for EMERGENCY_MEMORY_PRESSURE */
+  private final float emergencyMemoryPressure;
+  /** Cached value for HIGH_MEMORY_PRESSURE */
+  private final float highMemoryPressure;
+  /** Cached value for OPTIMAL_MEMORY_PRESSURE */
+  private final float optimalMemoryPressure;
+  /** Cached value for LOW_MEMORY_PRESSURE */
+  private final float lowMemoryPressure;
+  /** Cached value for CHECK_MEMORY_INTERVAL */
+  private final long checkMemoryInterval;
+  /** Cached value for LAST_GC_CALL_INTERVAL */
+  private final long lastGCCallInterval;
+  /**
+   * Cached value for NettyClient.MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER (max
+   * credit used for credit-based flow-control mechanism)
+   */
+  private final short maxRequestsCredit;
+  /**
+   * Whether the job is shutting down. Used for terminating the memory
+   * observer thread.
+   */
+  private final CountDownLatch shouldTerminate;
+  /** Result of memory observer thread */
+  private final Future<Void> checkMemoryThreadResult;
+  /** Out-of-core engine */
+  private final OutOfCoreEngine oocEngine;
+  /** Last time a major/full GC has been called (in milliseconds) */
+  private volatile long lastMajorGCTime;
+  /** Last time a non major/full GC has been called (in milliseconds) */
+  private volatile long lastMinorGCTime;
+
+  /**
+   * Constructor
+   *
+   * @param conf configuration
+   * @param oocEngine out-of-core engine
+   */
+  public ThresholdBasedOracle(ImmutableClassesGiraphConfiguration conf,
+                              OutOfCoreEngine oocEngine) {
+    this.failMemoryPressure = FAIL_MEMORY_PRESSURE.get(conf);
+    this.emergencyMemoryPressure = EMERGENCY_MEMORY_PRESSURE.get(conf);
+    this.highMemoryPressure = HIGH_MEMORY_PRESSURE.get(conf);
+    this.optimalMemoryPressure = OPTIMAL_MEMORY_PRESSURE.get(conf);
+    this.lowMemoryPressure = LOW_MEMORY_PRESSURE.get(conf);
+    this.checkMemoryInterval = CHECK_MEMORY_INTERVAL.get(conf);
+    this.lastGCCallInterval = LAST_GC_CALL_INTERVAL.get(conf);
+    this.maxRequestsCredit = (short)
+        CreditBasedFlowControl.MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER.get(conf);
+    NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.setIfUnset(conf, true);
+    boolean useCredit = NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.get(conf);
+    checkState(useCredit, "ThresholdBasedOracle: credit-based flow control " +
+        "must be enabled. Use giraph.waitForPerWorkerRequests=true");
+    this.shouldTerminate = new CountDownLatch(1);
+    this.oocEngine = oocEngine;
+    this.lastMajorGCTime = 0;
+
+    CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
+      @Override
+      public Callable<Void> newCallable(int callableId) {
+        return new Callable<Void>() {
+          @Override
+          public Void call() throws Exception {
+            while (true) {
+              boolean done = shouldTerminate.await(checkMemoryInterval,
+                  TimeUnit.MILLISECONDS);
+              if (done) {
+                break;
+              }
+              double usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction();
+              long time = System.currentTimeMillis();
+              if ((usedMemoryFraction > highMemoryPressure &&
+                  time - lastMajorGCTime >= lastGCCallInterval) ||
+                  (usedMemoryFraction > optimalMemoryPressure &&
+                  time - lastMajorGCTime >= lastGCCallInterval &&
+                  time - lastMinorGCTime >= lastGCCallInterval)) {
+                if (LOG.isInfoEnabled()) {
+                  LOG.info("call: last GC happened a while ago and the " +
+                      "amount of used memory is high (used memory " +
+                      "fraction is " +
+                      String.format("%.2f", usedMemoryFraction) + "). " +
+                      "Calling GC manually");
+                }
+                System.gc();
+                time = System.currentTimeMillis() - time;
+                usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction();
+                if (LOG.isInfoEnabled()) {
+                  LOG.info("call: manual GC is done. It took " +
+                      String.format("%.2f", (double) time / 1000) +
+                      " seconds. Used memory fraction is " +
+                      String.format("%.2f", usedMemoryFraction));
+                }
+              }
+              updateRates(usedMemoryFraction);
+            }
+            return null;
+          }
+        };
+      }
+    };
+    ExecutorService executor = Executors.newSingleThreadExecutor(
+        ThreadUtils.createThreadFactory("check-memory"));
+    this.checkMemoryThreadResult = executor.submit(new LogStacktraceCallable<>(
+        callableFactory.newCallable(0)));
+    executor.shutdown();
+  }
+
+  /**
+   * upon major/full GC calls.
+   */
+  /**
+   * Update statistics and rate regarding communication credits and number of
+   * active threads.
+   *
+   * @param usedMemoryFraction the fraction of used memory over max memory
+   */
+  public void updateRates(double usedMemoryFraction) {
+    // Update the fraction of processing threads that should remain active
+    if (usedMemoryFraction >= failMemoryPressure) {
+      oocEngine.updateActiveThreadsFraction(0);
+    } else if (usedMemoryFraction < emergencyMemoryPressure) {
+      oocEngine.updateActiveThreadsFraction(1);
+    } else {
+      oocEngine.updateActiveThreadsFraction(1 -
+          (usedMemoryFraction - emergencyMemoryPressure) /
+              (failMemoryPressure - emergencyMemoryPressure));
+    }
+
+    // Update the fraction of credit that should be used in credit-based flow-
+    // control
+    if (usedMemoryFraction >= emergencyMemoryPressure) {
+      updateRequestsCredit((short) 0);
+    } else if (usedMemoryFraction < optimalMemoryPressure) {
+      updateRequestsCredit(maxRequestsCredit);
+    } else {
+      updateRequestsCredit((short) (maxRequestsCredit *
+          (1 - (usedMemoryFraction - optimalMemoryPressure) /
+              (emergencyMemoryPressure - optimalMemoryPressure))));
+    }
+  }
+
+  @Override
+  public IOAction[] getNextIOActions() {
+    double usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction();
+    if (LOG.isInfoEnabled()) {
+      LOG.info(String.format("getNextIOActions: usedMemoryFraction = %.2f",
+          usedMemoryFraction));
+    }
+    if (usedMemoryFraction > highMemoryPressure) {
+      return new IOAction[]{
+        IOAction.STORE_MESSAGES_AND_BUFFERS,
+        IOAction.STORE_PARTITION};
+    } else if (usedMemoryFraction > optimalMemoryPressure) {
+      return new IOAction[]{
+        IOAction.LOAD_UNPROCESSED_PARTITION,
+        IOAction.STORE_MESSAGES_AND_BUFFERS,
+        IOAction.STORE_PROCESSED_PARTITION};
+    } else if (usedMemoryFraction > lowMemoryPressure) {
+      return new IOAction[]{
+        IOAction.LOAD_UNPROCESSED_PARTITION,
+        IOAction.STORE_MESSAGES_AND_BUFFERS,
+        IOAction.LOAD_PARTITION};
+    } else {
+      return new IOAction[]{IOAction.LOAD_PARTITION};
+    }
+  }
+
+  @Override
+  public boolean approve(IOCommand command) {
+    return true;
+  }
+
+  @Override
+  public void commandCompleted(IOCommand command) {
+    // Do nothing
+  }
+
+  @Override
+  public void gcCompleted(GarbageCollectionNotificationInfo gcInfo) {
+    String gcAction = gcInfo.getGcAction().toLowerCase();
+    if (gcAction.contains("full") || gcAction.contains("major")) {
+      if (!gcInfo.getGcCause().contains("No GC")) {
+        lastMajorGCTime = System.currentTimeMillis();
+      }
+    } else {
+      lastMinorGCTime = System.currentTimeMillis();
+    }
+  }
+
+  @Override
+  public void shutdown() {
+    shouldTerminate.countDown();
+    try {
+      checkMemoryThreadResult.get();
+    } catch (InterruptedException | ExecutionException e) {
+      LOG.error("shutdown: caught exception while waiting on check-memory " +
+          "thread to terminate!");
+      throw new IllegalStateException(e);
+    }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("shutdown: ThresholdBasedOracle shutdown complete!");
+    }
+  }
+
+  /**
+   * Update the credit announced for this worker in Netty. The lower the credit
+   * is, the lower rate incoming messages arrive at this worker. Thus, credit
+   * is an indirect way of controlling amount of memory incoming messages would
+   * take.
+   *
+   * @param newCredit the new credit to announce to other workers
+   */
+  private void updateRequestsCredit(short newCredit) {
+    if (LOG.isInfoEnabled()) {
+      LOG.info("updateRequestsCredit: updating the credit to " + newCredit);
+    }
+    FlowControl flowControl = oocEngine.getFlowControl();
+    if (flowControl != null) {
+      ((CreditBasedFlowControl) flowControl).updateCredit(newCredit);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedEdgeStore.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedEdgeStore.java 
b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedEdgeStore.java
index 7909100..53de52f 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedEdgeStore.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedEdgeStore.java
@@ -125,21 +125,21 @@ public class DiskBackedEdgeStore<I extends 
WritableComparable,
   }
 
   @Override
-  public void loadPartitionData(int partitionId, String basePath)
+  public long loadPartitionData(int partitionId, String basePath)
       throws IOException {
-    super.loadPartitionData(partitionId, getPath(basePath));
+    return super.loadPartitionData(partitionId, getPath(basePath));
   }
 
   @Override
-  public void offloadPartitionData(int partitionId, String basePath)
+  public long offloadPartitionData(int partitionId, String basePath)
       throws IOException {
-    super.offloadPartitionData(partitionId, getPath(basePath));
+    return super.offloadPartitionData(partitionId, getPath(basePath));
   }
 
   @Override
-  public void offloadBuffers(int partitionId, String basePath)
+  public long offloadBuffers(int partitionId, String basePath)
       throws IOException {
-    super.offloadBuffers(partitionId, getPath(basePath));
+    return super.offloadBuffers(partitionId, getPath(basePath));
   }
 
   @Override
@@ -157,8 +157,9 @@ public class DiskBackedEdgeStore<I extends 
WritableComparable,
   }
 
   @Override
-  protected void loadInMemoryPartitionData(int partitionId, String path)
+  protected long loadInMemoryPartitionData(int partitionId, String path)
       throws IOException {
+    long numBytes = 0;
     File file = new File(path);
     if (file.exists()) {
       if (LOG.isDebugEnabled()) {
@@ -170,14 +171,17 @@ public class DiskBackedEdgeStore<I extends 
WritableComparable,
       DataInputStream dis = new DataInputStream(bis);
       edgeStore.readPartitionEdgeStore(partitionId, dis);
       dis.close();
+      numBytes = file.length();
       checkState(file.delete(), "loadInMemoryPartitionData: failed to delete " 
+
           "%s.", file.getAbsoluteFile());
     }
+    return numBytes;
   }
 
   @Override
-  protected void offloadInMemoryPartitionData(int partitionId, String path)
+  protected long offloadInMemoryPartitionData(int partitionId, String path)
       throws IOException {
+    long numBytes = 0;
     if (edgeStore.hasEdgesForPartition(partitionId)) {
       File file = new File(path);
       checkState(!file.exists(), "offloadInMemoryPartitionData: edge store " +
@@ -190,7 +194,9 @@ public class DiskBackedEdgeStore<I extends 
WritableComparable,
       DataOutputStream dos = new DataOutputStream(bos);
       edgeStore.writePartitionEdgeStore(partitionId, dos);
       dos.close();
+      numBytes = dos.size();
     }
+    return numBytes;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java
 
b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java
index 2a40a58..9e56d99 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java
@@ -20,6 +20,7 @@ package org.apache.giraph.ooc.data;
 
 import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.factories.MessageValueFactory;
 import org.apache.giraph.utils.ByteArrayOneMessageToManyIds;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.VertexIdMessages;
@@ -60,6 +61,8 @@ public class DiskBackedMessageStore<I extends 
WritableComparable,
   private final boolean useMessageCombiner;
   /** Which superstep this message store is used for */
   private final long superstep;
+  /** Message value class */
+  private final MessageValueFactory<M> messageValueFactory;
 
   /**
    * Type of VertexIdMessage class (container for serialized messages) received
@@ -94,6 +97,7 @@ public class DiskBackedMessageStore<I extends 
WritableComparable,
     this.messageStore = messageStore;
     this.useMessageCombiner = useMessageCombiner;
     this.superstep = superstep;
+    this.messageValueFactory = config.createOutgoingMessageValueFactory();
   }
 
   @Override
@@ -148,26 +152,33 @@ public class DiskBackedMessageStore<I extends 
WritableComparable,
   }
 
   @Override
-  public void loadPartitionData(int partitionId, String basePath)
+  public long loadPartitionData(int partitionId, String basePath)
       throws IOException {
     if (!useMessageCombiner) {
-      super.loadPartitionData(partitionId, getPath(basePath, superstep));
+      return super.loadPartitionData(partitionId, getPath(basePath, 
superstep));
+    } else {
+      return 0;
     }
   }
 
   @Override
-  public void offloadPartitionData(int partitionId, String basePath)
+  public long offloadPartitionData(int partitionId, String basePath)
       throws IOException {
     if (!useMessageCombiner) {
-      super.offloadPartitionData(partitionId, getPath(basePath, superstep));
+      return
+          super.offloadPartitionData(partitionId, getPath(basePath, 
superstep));
+    } else {
+      return 0;
     }
   }
 
   @Override
-  public void offloadBuffers(int partitionId, String basePath)
+  public long offloadBuffers(int partitionId, String basePath)
       throws IOException {
     if (!useMessageCombiner) {
-      super.offloadBuffers(partitionId, getPath(basePath, superstep));
+      return super.offloadBuffers(partitionId, getPath(basePath, superstep));
+    } else {
+      return 0;
     }
   }
 
@@ -210,26 +221,24 @@ public class DiskBackedMessageStore<I extends 
WritableComparable,
       throw new IllegalStateException("writeEntry: serialized message " +
           "type is not supported");
     }
-    out.writeInt(messageClass.ordinal());
+    out.writeByte(messageClass.ordinal());
     messages.write(out);
   }
 
   @Override
   protected VertexIdMessages<I, M> readNextEntry(DataInput in)
       throws IOException {
-    int messageType = in.readInt();
+    byte messageType = in.readByte();
     SerializedMessageClass messageClass =
         SerializedMessageClass.values()[messageType];
     VertexIdMessages<I, M> vim;
     switch (messageClass) {
     case BYTE_ARRAY_VERTEX_ID_MESSAGES:
-      vim = new ByteArrayVertexIdMessages<>(
-          config.<M>createOutgoingMessageValueFactory());
+      vim = new ByteArrayVertexIdMessages<>(messageValueFactory);
       vim.setConf(config);
       break;
     case BYTE_ARRAY_ONE_MESSAGE_TO_MANY_IDS:
-      vim = new ByteArrayOneMessageToManyIds<>(
-          config.<M>createOutgoingMessageValueFactory());
+      vim = new ByteArrayOneMessageToManyIds<>(messageValueFactory);
       vim.setConf(config);
       break;
     default:
@@ -241,8 +250,9 @@ public class DiskBackedMessageStore<I extends 
WritableComparable,
   }
 
   @Override
-  protected void loadInMemoryPartitionData(int partitionId, String basePath)
+  protected long loadInMemoryPartitionData(int partitionId, String basePath)
       throws IOException {
+    long numBytes = 0;
     File file = new File(basePath);
     if (file.exists()) {
       if (LOG.isDebugEnabled()) {
@@ -254,14 +264,17 @@ public class DiskBackedMessageStore<I extends 
WritableComparable,
       DataInputStream dis = new DataInputStream(bis);
       messageStore.readFieldsForPartition(dis, partitionId);
       dis.close();
+      numBytes = file.length();
       checkState(file.delete(), "loadInMemoryPartitionData: failed to delete " 
+
           "%s.", file.getAbsoluteFile());
     }
+    return numBytes;
   }
 
   @Override
-  protected void offloadInMemoryPartitionData(int partitionId, String basePath)
+  protected long offloadInMemoryPartitionData(int partitionId, String basePath)
       throws IOException {
+    long numBytes = 0;
     if (messageStore.hasMessagesForPartition(partitionId)) {
       File file = new File(basePath);
       checkState(!file.exists(), "offloadInMemoryPartitionData: message store" 
+
@@ -275,7 +288,9 @@ public class DiskBackedMessageStore<I extends 
WritableComparable,
       messageStore.writePartition(outputStream, partitionId);
       messageStore.clearPartition(partitionId);
       outputStream.close();
+      numBytes += outputStream.size();
     }
+    return numBytes;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/6256a761/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java
 
b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java
index 5854c8d..2a5e47a 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java
@@ -130,7 +130,7 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
   public Partition<I, V, E> removePartition(Integer partitionId) {
     // Set the partition as 'in process' so its data and messages do not get
     // spilled to disk until the remove is complete.
-    oocEngine.getMetaPartitionManager().makePartitionInaccessible(partitionId);
+    oocEngine.getMetaPartitionManager().markPartitionAsInProcess(partitionId);
     oocEngine.retrievePartition(partitionId);
     Partition<I, V, E> partition = partitionStore.removePartition(partitionId);
     checkNotNull(partition, "removePartition: partition " + partitionId +
@@ -262,8 +262,12 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
       throws IOException {
     I id = conf.createVertexId();
     id.readFields(in);
-    V value = conf.createVertexValue();
-    value.readFields(in);
+    V value = null;
+    boolean hasNullValue = in.readBoolean();
+    if (!hasNullValue) {
+      value = conf.createVertexValue();
+      value.readFields(in);
+    }
     OutEdges<I, E> edges = conf.createAndInitializeOutEdges(0);
     vertex.initialize(id, value, edges);
     if (in.readBoolean()) {
@@ -291,8 +295,9 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
   }
 
   @Override
-  protected void loadInMemoryPartitionData(int partitionId, String path)
+  protected long loadInMemoryPartitionData(int partitionId, String path)
       throws IOException {
+    long numBytes = 0;
     // Load vertices
     File file = new File(getVerticesPath(path));
     if (file.exists()) {
@@ -312,6 +317,7 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
         partition.putVertex(vertex);
       }
       inputStream.close();
+      numBytes += file.length();
       checkState(file.delete(), "loadInMemoryPartitionData: failed to delete " 
+
           "%s", file.getAbsolutePath());
 
@@ -329,15 +335,17 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
         readOutEdges(inputStream, partition);
       }
       inputStream.close();
+      numBytes += file.length();
       // If the graph is static and it is not INPUT_SUPERSTEP, keep the file
       // around.
       if (!conf.isStaticGraph() ||
-          serviceWorker.getSuperstep() == BspService.INPUT_SUPERSTEP) {
+          oocEngine.getSuperstep() == BspService.INPUT_SUPERSTEP) {
         checkState(file.delete(), "loadPartition: failed to delete %s",
             file.getAbsolutePath());
       }
       partitionStore.addPartition(partition);
     }
+    return numBytes;
   }
 
   @Override
@@ -355,15 +363,15 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
   }
 
   @Override
-  public void loadPartitionData(int partitionId, String basePath)
+  public long loadPartitionData(int partitionId, String basePath)
       throws IOException {
-    super.loadPartitionData(partitionId, getPath(basePath));
+    return super.loadPartitionData(partitionId, getPath(basePath));
   }
 
   @Override
-  public void offloadPartitionData(int partitionId, String basePath)
+  public long offloadPartitionData(int partitionId, String basePath)
       throws IOException {
-    super.offloadPartitionData(partitionId, getPath(basePath));
+    return super.offloadPartitionData(partitionId, getPath(basePath));
   }
 
   /**
@@ -376,7 +384,13 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
   private void writeVertexData(DataOutput output, Vertex<I, V, E> vertex)
       throws IOException {
     vertex.getId().write(output);
-    vertex.getValue().write(output);
+    V value = vertex.getValue();
+    if (value != null) {
+      output.writeBoolean(false);
+      value.write(output);
+    } else {
+      output.writeBoolean(true);
+    }
     output.writeBoolean(vertex.isHalted());
   }
 
@@ -395,8 +409,9 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
   }
 
   @Override
-  protected void offloadInMemoryPartitionData(int partitionId, String path)
+  protected long offloadInMemoryPartitionData(int partitionId, String path)
       throws IOException {
+    long numBytes = 0;
     if (partitionStore.hasPartition(partitionId)) {
       partitionVertexCount.put(partitionId,
           partitionStore.getPartitionVertexCount(partitionId));
@@ -423,13 +438,14 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
         writeVertexData(outputStream, vertex);
       }
       outputStream.close();
+      numBytes += outputStream.size();
 
       // Avoid writing back edges if we have already written them once and
       // the graph is not changing.
       // If we are in the input superstep, we need to write the files
       // at least the first time, even though the graph is static.
       file = new File(getEdgesPath(path));
-      if (serviceWorker.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP ||
+      if (oocEngine.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP ||
           partitionVertexCount.get(partitionId) == null ||
           partitionVertexCount.get(partitionId) != partition.getVertexCount() 
||
           !conf.isStaticGraph() || !file.exists()) {
@@ -446,8 +462,10 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
           writeOutEdges(outputStream, vertex);
         }
         outputStream.close();
+        numBytes += outputStream.size();
       }
     }
+    return numBytes;
   }
 
   @Override
@@ -457,9 +475,9 @@ public class DiskBackedPartitionStore<I extends 
WritableComparable,
   }
 
   @Override
-  public void offloadBuffers(int partitionId, String basePath)
+  public long offloadBuffers(int partitionId, String basePath)
       throws IOException {
-    super.offloadBuffers(partitionId, getPath(basePath));
+    return super.offloadBuffers(partitionId, getPath(basePath));
   }
 
   @Override

Reply via email to