http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
 
b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
new file mode 100644
index 0000000..6c08dfd
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
@@ -0,0 +1,947 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.data;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.giraph.ooc.OutOfCoreEngine;
+import org.apache.giraph.worker.BspServiceWorker;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Class to keep meta-information about partition data, edge data, and message
+ * data of each partition on a worker.
+ */
+public class MetaPartitionManager {
+  /**
+   * Flag representing no partitions is left to process in the current 
iteration
+   * cycle over all partitions.
+   */
+  public static final int NO_PARTITION_TO_PROCESS = -1;
+
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(MetaPartitionManager.class);
+  /** Different storage states for data */
+  private enum StorageState { IN_MEM, ON_DISK, IN_TRANSIT };
+  /**
+   * Different processing states for partitions. Processing states are reset
+   * at the beginning of each iteration cycle over partitions.
+   */
+  private enum ProcessingState { PROCESSED, UNPROCESSED, IN_PROCESS };
+
+  /** Number of in-memory partitions */
+  private final AtomicInteger numInMemoryPartitions = new AtomicInteger(0);
+  /** Map of partitions to their meta information */
+  private final ConcurrentMap<Integer, MetaPartition> partitions =
+      Maps.newConcurrentMap();
+  /** List of partitions assigned to each IO threads */
+  private final List<PerThreadPartitionStatus> perThreadPartitions;
+  /** For each IO thread, set of partition ids that are on-disk and have
+   * 'large enough' vertex/edge buffers to be offloaded on disk
+   */
+  private final List<Set<Integer>> perThreadVertexEdgeBuffers;
+  /**
+   * For each IO thread, set of partition ids that are on-disk and have
+   * 'large enough' message buffers to be offloaded on disk
+   */
+  private final List<Set<Integer>> perThreadMessageBuffers;
+  /**
+   * Out-of-core engine
+   */
+  private final OutOfCoreEngine oocEngine;
+  /**
+   * Number of processed partitions in the current iteration cycle over all
+   * partitions
+   */
+  private final AtomicInteger numPartitionsProcessed = new AtomicInteger(0);
+  /**
+   * Random number generator to choose a thread to get one of its partition for
+   * processing
+   */
+  private final Random randomGenerator;
+
+  /**
+   * Constructor
+   *
+   * @param numIOThreads number of IO threads
+   * @param oocEngine out-of-core engine
+   */
+  public MetaPartitionManager(int numIOThreads, OutOfCoreEngine oocEngine) {
+    perThreadPartitions = new ArrayList<>(numIOThreads);
+    perThreadVertexEdgeBuffers = new ArrayList<>(numIOThreads);
+    perThreadMessageBuffers = new ArrayList<>(numIOThreads);
+    for (int i = 0; i < numIOThreads; ++i) {
+      perThreadPartitions.add(new PerThreadPartitionStatus());
+      perThreadMessageBuffers.add(Sets.<Integer>newConcurrentHashSet());
+      perThreadVertexEdgeBuffers.add(Sets.<Integer>newConcurrentHashSet());
+    }
+    this.oocEngine = oocEngine;
+    this.randomGenerator = new Random();
+  }
+
+  /**
+   * Get number of partitions in memory
+   *
+   * @return number of partitions in memory
+   */
+  public int getNumInMemoryPartitions() {
+    return numInMemoryPartitions.get();
+  }
+
+  /**
+   * Get total number of partitions
+   *
+   * @return total number of partition
+   */
+  public int getNumPartitions() {
+    return partitions.size();
+  }
+
+  /**
+   * Whether a given partition is available
+   *
+   * @param partitionId id of the partition to check if this worker owns it
+   * @return true if the worker owns the partition, false otherwise
+   */
+  public boolean hasPartition(Integer partitionId) {
+    return partitions.containsKey(partitionId);
+  }
+
+  /**
+   * Return the list of all available partitions as an iterable
+   *
+   * @return list of all available partitions
+   */
+  public Iterable<Integer> getPartitionIds() {
+    return partitions.keySet();
+  }
+
+  /**
+   * Add a partition
+   *
+   * @param partitionId id of a partition to add
+   */
+  public void addPartition(int partitionId) {
+    MetaPartition meta = new MetaPartition(partitionId);
+    MetaPartition temp = partitions.putIfAbsent(partitionId, meta);
+    // Check if the given partition is new
+    if (temp == null) {
+      int ownerThread = oocEngine.getIOScheduler()
+          .getOwnerThreadId(partitionId);
+      Set<MetaPartition> partitionSet =
+          perThreadPartitions.get(ownerThread).getInMemoryProcessed();
+      synchronized (partitionSet) {
+        partitionSet.add(meta);
+      }
+      numInMemoryPartitions.getAndIncrement();
+    }
+  }
+
+  /**
+   * Remove a partition. This method assumes that the partition is already
+   * retrieved and is in memory)
+   *
+   * @param partitionId id of a partition to remove
+   */
+  public void removePartition(Integer partitionId) {
+    MetaPartition meta = partitions.remove(partitionId);
+    checkState(!meta.isOnDisk());
+    numInMemoryPartitions.getAndDecrement();
+  }
+
+  /**
+   * Pops an entry from the specified set.
+   *
+   * @param set set to pop an entry from
+   * @param <T> Type of entries in the set
+   * @return popped entry from the given set
+   */
+  private static <T> T popFromSet(Set<T> set) {
+    if (!set.isEmpty()) {
+      Iterator<T> it = set.iterator();
+      T entry = it.next();
+      it.remove();
+      return entry;
+    }
+    return null;
+  }
+
+  /**
+   * Peeks an entry from the specified set.
+   *
+   * @param set set to peek an entry from
+   * @param <T> Type of entries in the set
+   * @return peeked entry from the given set
+   */
+  private static <T> T peekFromSet(Set<T> set) {
+    if (!set.isEmpty()) {
+      return set.iterator().next();
+    }
+    return null;
+  }
+
+  /**
+   * Get id of a partition to offload on disk
+   *
+   * @param threadId id of the thread who is going to store the partition on
+   *                 disk
+   * @return id of the partition to offload on disk
+   */
+  public Integer getOffloadPartitionId(int threadId) {
+    Set<MetaPartition> partitionSet = perThreadPartitions.get(threadId)
+        .getInMemoryProcessed();
+    synchronized (partitionSet) {
+      MetaPartition meta = peekFromSet(partitionSet);
+      if (meta != null) {
+        return meta.getPartitionId();
+      }
+    }
+    partitionSet = perThreadPartitions.get(threadId).getInMemoryUnprocessed();
+    synchronized (partitionSet) {
+      MetaPartition meta = peekFromSet(partitionSet);
+      if (meta != null) {
+        return meta.getPartitionId();
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Get id of a partition to offload its vertex/edge buffers on disk
+   *
+   * @param threadId id of the thread who is going to store the buffers on disk
+   * @return id of the partition to offload its vertex/edge buffers on disk
+   */
+  public Integer getOffloadPartitionBufferId(int threadId) {
+    if (oocEngine.getServiceWorker().getSuperstep() ==
+        BspServiceWorker.INPUT_SUPERSTEP) {
+      Integer partitionId =
+          popFromSet(perThreadVertexEdgeBuffers.get(threadId));
+      if (partitionId == null) {
+        DiskBackedPartitionStore<?, ?, ?> partitionStore =
+            (DiskBackedPartitionStore<?, ?, ?>) (oocEngine.getServerData()
+                .getPartitionStore());
+        perThreadVertexEdgeBuffers.get(threadId)
+            .addAll(partitionStore.getCandidateBuffersToOffload());
+        DiskBackedEdgeStore<?, ?, ?> edgeStore =
+            (DiskBackedEdgeStore<?, ?, ?>) (oocEngine.getServerData())
+                .getEdgeStore();
+        perThreadVertexEdgeBuffers.get(threadId)
+            .addAll(edgeStore.getCandidateBuffersToOffload());
+        partitionId = popFromSet(perThreadVertexEdgeBuffers.get(threadId));
+      }
+      return partitionId;
+    }
+    return null;
+  }
+
+  /**
+   * Get id of a partition to offload its incoming message buffers on disk
+   *
+   * @param threadId id of the thread who is going to store the buffers on disk
+   * @return id of the partition to offload its message buffer on disk
+   */
+  public Integer getOffloadMessageBufferId(int threadId) {
+    if (oocEngine.getServiceWorker().getSuperstep() !=
+        BspServiceWorker.INPUT_SUPERSTEP) {
+      Integer partitionId =
+          popFromSet(perThreadMessageBuffers.get(threadId));
+      if (partitionId == null) {
+        DiskBackedMessageStore<?, ?> messageStore =
+            (DiskBackedMessageStore<?, ?>) (oocEngine.getServerData()
+                .getIncomingMessageStore());
+        if (messageStore != null) {
+          perThreadMessageBuffers.get(threadId)
+              .addAll(messageStore.getCandidateBuffersToOffload());
+          partitionId = popFromSet(perThreadMessageBuffers.get(threadId));
+        }
+      }
+      return partitionId;
+    }
+    return null;
+  }
+
+  /**
+   * Get id of a partition to offload its incoming message on disk
+   *
+   * @param threadId id of the thread who is going to store the incoming
+   *                 messages on disk
+   * @return id of the partition to offload its message on disk
+   */
+  public Integer getOffloadMessageId(int threadId) {
+    Set<MetaPartition> partitionSet = perThreadPartitions.get(threadId)
+        .getInDiskProcessed();
+    synchronized (partitionSet) {
+      for (MetaPartition meta : partitionSet) {
+        if (meta.getIncomingMessagesState() == StorageState.IN_MEM) {
+          return meta.getPartitionId();
+        }
+      }
+    }
+    partitionSet = perThreadPartitions.get(threadId).getInDiskUnprocessed();
+    synchronized (partitionSet) {
+      for (MetaPartition meta : partitionSet) {
+        if (meta.getIncomingMessagesState() == StorageState.IN_MEM) {
+          return meta.getPartitionId();
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Get id of a partition to prefetch its data to memory
+   *
+   * @param threadId id of the thread who is going to load the partition data
+   * @return id of the partition to load its data to memory
+   */
+  public Integer getPrefetchPartitionId(int threadId) {
+    Set<MetaPartition> partitionSet =
+        perThreadPartitions.get(threadId).getInDiskUnprocessed();
+    synchronized (partitionSet) {
+      MetaPartition meta = peekFromSet(partitionSet);
+      return (meta != null) ? meta.getPartitionId() : null;
+    }
+  }
+
+  /**
+   * Mark a partition inaccessible to IO and compute threads
+   *
+   * @param partitionId id of the partition to mark
+   */
+  public void makePartitionInaccessible(int partitionId) {
+    MetaPartition meta = partitions.get(partitionId);
+    perThreadPartitions.get(oocEngine.getIOScheduler()
+        .getOwnerThreadId(partitionId))
+        .remove(meta);
+    synchronized (meta) {
+      meta.setProcessingState(ProcessingState.IN_PROCESS);
+    }
+  }
+
+  /**
+   * Mark a partition as 'PROCESSED'
+   *
+   * @param partitionId id of the partition to mark
+   */
+  public void setPartitionIsProcessed(int partitionId) {
+    MetaPartition meta = partitions.get(partitionId);
+    synchronized (meta) {
+      meta.setProcessingState(ProcessingState.PROCESSED);
+    }
+    Set<MetaPartition> partitionSet = perThreadPartitions
+        .get(oocEngine.getIOScheduler().getOwnerThreadId(partitionId))
+        .getInMemoryProcessed();
+    synchronized (partitionSet) {
+      partitionSet.add(meta);
+    }
+    numPartitionsProcessed.getAndIncrement();
+  }
+
+  /**
+   * Notify this meta store that load of a partition for a specific superstep
+   * is about to start.
+   *
+   * @param partitionId id of the partition to load to memory
+   * @param superstep superstep in which the partition is needed for
+   * @return true iff load of the given partition is viable
+   */
+  public boolean startLoadingPartition(int partitionId, long superstep) {
+    MetaPartition meta = partitions.get(partitionId);
+    synchronized (meta) {
+      boolean shouldLoad = meta.getPartitionState() == StorageState.ON_DISK;
+      if (superstep == oocEngine.getServiceWorker().getSuperstep()) {
+        shouldLoad |= meta.getCurrentMessagesState() == StorageState.ON_DISK;
+      } else {
+        shouldLoad |= meta.getIncomingMessagesState() == StorageState.ON_DISK;
+      }
+      return shouldLoad;
+    }
+  }
+
+  /**
+   * Notify this meta store that load of a partition for a specific superstep
+   * is completed
+   *
+   * @param partitionId id of a the partition that load is completed
+   * @param superstep superstep in which the partition is loaded for
+   */
+  public void doneLoadingPartition(int partitionId, long superstep) {
+    MetaPartition meta = partitions.get(partitionId);
+    numInMemoryPartitions.getAndIncrement();
+    int owner = oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
+    boolean removed = perThreadPartitions.get(owner)
+        .remove(meta, StorageState.ON_DISK);
+    if (removed || meta.getProcessingState() == ProcessingState.IN_PROCESS) {
+      synchronized (meta) {
+        meta.setPartitionState(StorageState.IN_MEM);
+        if (superstep == oocEngine.getServiceWorker().getSuperstep()) {
+          meta.setCurrentMessagesState(StorageState.IN_MEM);
+        } else {
+          meta.setIncomingMessagesState(StorageState.IN_MEM);
+        }
+      }
+      perThreadPartitions.get(owner).add(meta, StorageState.IN_MEM);
+    }
+  }
+
+  /**
+   * Notify this meta store that offload of messages for a particular partition
+   * is about to start.
+   *
+   * @param partitionId id of the partition that its messages is being 
offloaded
+   * @return true iff offload of messages of the given partition is viable
+   */
+  public boolean startOffloadingMessages(int partitionId) {
+    MetaPartition meta = partitions.get(partitionId);
+    synchronized (meta) {
+      if (meta.getIncomingMessagesState() == StorageState.IN_MEM) {
+        meta.setIncomingMessagesState(StorageState.IN_TRANSIT);
+        return true;
+      } else {
+        return false;
+      }
+    }
+  }
+
+  /**
+   * Notify this meta store that offload of messages for a particular partition
+   * is complete.
+   *
+   * @param partitionId id of the partition that its messages is offloaded to
+   *                    disk
+   */
+  public void doneOffloadingMessages(int partitionId) {
+    MetaPartition meta = partitions.get(partitionId);
+    synchronized (meta) {
+      meta.setIncomingMessagesState(StorageState.ON_DISK);
+    }
+  }
+
+  /**
+   * Notify this meta store that offload of raw data buffers (vertex/edges/
+   * messages) of a particular partition is about to start.
+   *
+   * @param partitionId id of the partition that its buffer is being offloaded
+   * @return true iff offload of buffers of the given partition is viable
+   */
+  public boolean startOffloadingBuffer(int partitionId) {
+    // Do nothing
+    return true;
+  }
+
+  /**
+   * Notify this meta store that offload of raw data buffers (vertex/edges/
+   * messages) of a particular partition is completed.
+   *
+   * @param partitionId id of the partition that its buffer is offloaded
+   */
+  public void doneOffloadingBuffer(int partitionId) {
+    // Do nothing
+  }
+
+  /**
+   * Notify this meta store that offload of a partition (partition data and its
+   * current messages) is about to start.
+   *
+   * @param partitionId id of the partition that its data is being offloaded
+   * @return true iff offload of the given partition is viable
+   */
+  public boolean startOffloadingPartition(int partitionId) {
+    MetaPartition meta = partitions.get(partitionId);
+    int owner = oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
+    boolean removed = perThreadPartitions.get(owner)
+        .remove(meta, StorageState.IN_MEM);
+    if (removed) {
+      synchronized (meta) {
+        meta.setPartitionState(StorageState.IN_TRANSIT);
+        meta.setCurrentMessagesState(StorageState.IN_TRANSIT);
+      }
+      perThreadPartitions.get(owner).add(meta, StorageState.IN_TRANSIT);
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Notify this meta store that offload of a partition (partition data and its
+   * current messages) is completed.
+   *
+   * @param partitionId id of the partition that its data is offloaded
+   */
+  public void doneOffloadingPartition(int partitionId) {
+    numInMemoryPartitions.getAndDecrement();
+    MetaPartition meta = partitions.get(partitionId);
+    int owner = oocEngine.getIOScheduler().getOwnerThreadId(partitionId);
+    boolean removed = perThreadPartitions.get(owner)
+        .remove(meta, StorageState.IN_TRANSIT);
+    if (removed) {
+      synchronized (meta) {
+        meta.setPartitionState(StorageState.ON_DISK);
+        meta.setCurrentMessagesState(StorageState.ON_DISK);
+      }
+      perThreadPartitions.get(owner).add(meta, StorageState.ON_DISK);
+    }
+  }
+
+  /**
+   * Reset the meta store for a new iteration cycle over all partitions.
+   * Note: this is not thread-safe and should be called from a single thread.
+   */
+  public void resetPartition() {
+    for (MetaPartition meta : partitions.values()) {
+      meta.resetPartition();
+    }
+    int numPartition = 0;
+    for (PerThreadPartitionStatus status : perThreadPartitions) {
+      numPartition += status.reset();
+    }
+    checkState(numPartition == partitions.size());
+    numPartitionsProcessed.set(0);
+  }
+
+  /**
+   * Reset messages in the meta store.
+   * Note: this is not thread-safe and should be called from a single thread.
+   */
+  public void resetMessages() {
+    for (MetaPartition meta : partitions.values()) {
+      meta.resetMessages();
+    }
+    // After swapping incoming messages and current messages, it may be the 
case
+    // that a partition has data in memory (partitionState == IN_MEM), but now
+    // its current messages are on disk (currentMessageState == ON_DISK). So, 
we
+    // have to mark the partition as ON_DISK, and load its messages once it is
+    // about to be processed.
+    for (PerThreadPartitionStatus status : perThreadPartitions) {
+      Set<MetaPartition> partitionSet = status.getInMemoryUnprocessed();
+      Iterator<MetaPartition> it = partitionSet.iterator();
+      while (it.hasNext()) {
+        MetaPartition meta = it.next();
+        if (meta.getCurrentMessagesState() == StorageState.ON_DISK) {
+          it.remove();
+          status.getInDiskUnprocessed().add(meta);
+          numInMemoryPartitions.getAndDecrement();
+        }
+      }
+      partitionSet = status.getInMemoryProcessed();
+      it = partitionSet.iterator();
+      while (it.hasNext()) {
+        MetaPartition meta = it.next();
+        if (meta.getCurrentMessagesState() == StorageState.ON_DISK) {
+          it.remove();
+          status.getInDiskProcessed().add(meta);
+          numInMemoryPartitions.getAndDecrement();
+        }
+      }
+    }
+  }
+
+  /**
+   * Return the id of an unprocessed partition in memory. If all partitions are
+   * processed, return an appropriate 'finisher signal'. If there are
+   * unprocessed partitions, but none are is memory, return null.
+   *
+   * @return id of the partition to be processed next.
+   */
+  public Integer getNextPartition() {
+    if (numPartitionsProcessed.get() >= partitions.size()) {
+      return NO_PARTITION_TO_PROCESS;
+    }
+    int numThreads = perThreadPartitions.size();
+    int index = randomGenerator.nextInt(numThreads);
+    int startIndex = index;
+    do {
+      Set<MetaPartition> partitionSet =
+          perThreadPartitions.get(index).getInMemoryUnprocessed();
+      MetaPartition meta;
+      synchronized (partitionSet) {
+        meta = popFromSet(partitionSet);
+      }
+      if (meta != null) {
+        synchronized (meta) {
+          meta.setProcessingState(ProcessingState.IN_PROCESS);
+          return meta.getPartitionId();
+        }
+      }
+      index = (index + 1) % numThreads;
+    } while (index != startIndex);
+    return null;
+  }
+
+  /**
+   * Whether a partition is on disk (both its data and its current messages)
+   *
+   * @param partitionId id of the partition to check if it is on disk
+   * @return true if partition data or its current messages are on disk, false
+   *         otherwise
+   */
+  public boolean isPartitionOnDisk(int partitionId) {
+    MetaPartition meta = partitions.get(partitionId);
+    return meta.isOnDisk();
+  }
+
+  /**
+   * Representation of meta information of a partition
+   */
+  private static class MetaPartition {
+    /** Id of the partition */
+    private int partitionId;
+    /** Storage state of incoming messages */
+    private StorageState incomingMessagesState;
+    /** Storage state of current messages */
+    private StorageState currentMessagesState;
+    /** Storage state of partition data */
+    private StorageState partitionState;
+    /** Processing state of a partition */
+    private volatile ProcessingState processingState;
+
+    /**
+     * Constructor
+     *
+     * @param partitionId id of the partition
+     */
+    MetaPartition(int partitionId) {
+      this.partitionId = partitionId;
+      this.processingState = ProcessingState.PROCESSED;
+      this.partitionState = StorageState.IN_MEM;
+      this.currentMessagesState = StorageState.IN_MEM;
+    }
+
+    @Override
+    public String toString() {
+      StringBuffer sb = new StringBuffer();
+      sb.append("\nMetaData: {");
+      sb.append("ID: " + partitionId + "; ");
+      sb.append("Partition: " + partitionState + "; ");
+      sb.append("Current Messages: " + currentMessagesState + "; ");
+      sb.append("Incoming Messages: " + incomingMessagesState + "; ");
+      sb.append("Processed? : " + processingState + "}");
+      return sb.toString();
+    }
+
+    /**
+     * Get id of the partition
+     *
+     * @return id of the partition
+     */
+    public int getPartitionId() {
+      return partitionId;
+    }
+
+    /**
+     * Get storage state of incoming messages of the partition
+     *
+     * @return storage state of incoming messages
+     */
+    public StorageState getIncomingMessagesState() {
+      return incomingMessagesState;
+    }
+
+    /**
+     * Set storage state of incoming messages of the partition
+     *
+     * @param incomingMessagesState storage state of incoming messages
+     */
+    public void setIncomingMessagesState(StorageState incomingMessagesState) {
+      this.incomingMessagesState = incomingMessagesState;
+    }
+
+    /**
+     * Get storage state of current messages of the partition
+     *
+     * @return storage state of current messages
+     */
+    public StorageState getCurrentMessagesState() {
+      return currentMessagesState;
+    }
+
+    /**
+     * Set storage state of current messages of the partition
+     *
+     * @param currentMessagesState storage state of current messages
+     */
+    public void setCurrentMessagesState(StorageState currentMessagesState) {
+      this.currentMessagesState = currentMessagesState;
+    }
+
+    /**
+     * Get storage state of the partition
+     *
+     * @return storage state of the partition
+     */
+    public StorageState getPartitionState() {
+      return partitionState;
+    }
+
+    /**
+     * Set storage state of the partition
+     *
+     * @param state storage state of the partition
+     */
+    public void setPartitionState(StorageState state) {
+      this.partitionState = state;
+    }
+
+    public ProcessingState getProcessingState() {
+      return processingState;
+    }
+
+    public void setProcessingState(ProcessingState processingState) {
+      this.processingState = processingState;
+    }
+
+    /**
+     * Whether the partition is on disk (either its data or its current
+     * messages)
+     *
+     * @return true if the partition is on disk, false otherwise
+     */
+    public boolean isOnDisk() {
+      return partitionState == StorageState.ON_DISK ||
+          currentMessagesState == StorageState.ON_DISK;
+    }
+
+    /**
+     * Reset the partition meta information for the next iteration cycle
+     */
+    public void resetPartition() {
+      processingState = ProcessingState.UNPROCESSED;
+    }
+
+    /**
+     * Reset messages meta information for the next iteration cycle
+     */
+    public void resetMessages() {
+      currentMessagesState = incomingMessagesState;
+      incomingMessagesState = StorageState.IN_MEM;
+    }
+  }
+
+  /**
+   * Representation of partitions' state per IO thread
+   */
+  private static class PerThreadPartitionStatus {
+    /**
+     * Contains partitions that has been processed in the current iteration
+     * cycle, and are not in use by any thread.
+     */
+    private Map<StorageState, Set<MetaPartition>>
+        processedPartitions = Maps.newConcurrentMap();
+    /**
+     * Contains partitions that has *NOT* been processed in the current
+     * iteration cycle, and are not in use by any thread.
+     */
+    private Map<StorageState, Set<MetaPartition>>
+        unprocessedPartitions = Maps.newConcurrentMap();
+
+    /**
+     * Constructor
+     */
+    public PerThreadPartitionStatus() {
+      processedPartitions.put(StorageState.IN_MEM,
+          Sets.<MetaPartition>newLinkedHashSet());
+      processedPartitions.put(StorageState.ON_DISK,
+          Sets.<MetaPartition>newLinkedHashSet());
+      processedPartitions.put(StorageState.IN_TRANSIT,
+          Sets.<MetaPartition>newLinkedHashSet());
+
+      unprocessedPartitions.put(StorageState.IN_MEM,
+          Sets.<MetaPartition>newLinkedHashSet());
+      unprocessedPartitions.put(StorageState.ON_DISK,
+          Sets.<MetaPartition>newLinkedHashSet());
+      unprocessedPartitions.put(StorageState.IN_TRANSIT,
+          Sets.<MetaPartition>newLinkedHashSet());
+    }
+
+    @Override
+    public String toString() {
+      StringBuffer sb = new StringBuffer();
+      sb.append("\nProcessed Partitions: " + processedPartitions + "; ");
+      sb.append("\nUnprocessedPartitions: " + unprocessedPartitions);
+      return sb.toString();
+    }
+
+    /**
+     * Get set of partitions that are in memory and are processed
+     *
+     * @return set of partition that are in memory are are processed
+     */
+    public Set<MetaPartition> getInMemoryProcessed() {
+      return processedPartitions.get(StorageState.IN_MEM);
+    }
+
+    /**
+     * Get set of partitions that are in memory are are not processed
+     *
+     * @return set of partitions that are in memory and are not processed
+     */
+    public Set<MetaPartition> getInMemoryUnprocessed() {
+      return unprocessedPartitions.get(StorageState.IN_MEM);
+    }
+
+    /**
+     * Get set of partitions that are on disk and are processed
+     *
+     * @return set of partitions that are on disk and are processed
+     */
+    public Set<MetaPartition> getInDiskProcessed() {
+      return processedPartitions.get(StorageState.ON_DISK);
+    }
+
+    /**
+     * Get set of partitions that are on disk and are not processed
+     *
+     * @return set of partitions that are on disk and are not processed
+     */
+    public Set<MetaPartition> getInDiskUnprocessed() {
+      return unprocessedPartitions.get(StorageState.ON_DISK);
+    }
+
+    /**
+     * Remove a partition from meta information
+     *
+     * @param meta meta-information of a partition to be removed
+     */
+    public void remove(MetaPartition meta) {
+      Set<MetaPartition> partitionSet;
+      partitionSet = processedPartitions.get(StorageState.IN_MEM);
+      synchronized (partitionSet) {
+        if (partitionSet.remove(meta)) {
+          return;
+        }
+      }
+      partitionSet = unprocessedPartitions.get(StorageState.IN_MEM);
+      synchronized (partitionSet) {
+        if (partitionSet.remove(meta)) {
+          return;
+        }
+      }
+      partitionSet = processedPartitions.get(StorageState.IN_TRANSIT);
+      synchronized (partitionSet) {
+        if (partitionSet.remove(meta)) {
+          return;
+        }
+      }
+      partitionSet = unprocessedPartitions.get(StorageState.IN_TRANSIT);
+      synchronized (partitionSet) {
+        if (partitionSet.remove(meta)) {
+          return;
+        }
+      }
+      partitionSet = processedPartitions.get(StorageState.ON_DISK);
+      synchronized (partitionSet) {
+        if (partitionSet.remove(meta)) {
+          return;
+        }
+      }
+      partitionSet = unprocessedPartitions.get(StorageState.ON_DISK);
+      synchronized (partitionSet) {
+        partitionSet.remove(meta);
+      }
+    }
+
+    /**
+     * Reset meta-information for the next iteration cycle over all partitions
+     *
+     * @return total number of partitions kept for this thread
+     */
+    public int reset() {
+      checkState(unprocessedPartitions.get(StorageState.IN_MEM).size() == 0);
+      checkState(unprocessedPartitions.get(StorageState.IN_TRANSIT).size() ==
+          0);
+      checkState(unprocessedPartitions.get(StorageState.ON_DISK).size() == 0);
+      unprocessedPartitions.clear();
+      unprocessedPartitions.putAll(processedPartitions);
+      processedPartitions.clear();
+      processedPartitions.put(StorageState.IN_MEM,
+          Sets.<MetaPartition>newLinkedHashSet());
+      processedPartitions.put(StorageState.ON_DISK,
+          Sets.<MetaPartition>newLinkedHashSet());
+      processedPartitions.put(StorageState.IN_TRANSIT,
+          Sets.<MetaPartition>newLinkedHashSet());
+      return unprocessedPartitions.get(StorageState.IN_MEM).size() +
+          unprocessedPartitions.get(StorageState.IN_TRANSIT).size() +
+          unprocessedPartitions.get(StorageState.ON_DISK).size();
+    }
+
+    /**
+     * Remove a partition from partition set of a given state
+     *
+     * @param meta meta partition to remove
+     * @param state state from which the partition should be removed
+     * @return true iff the partition is actually removed
+     */
+    public boolean remove(MetaPartition meta, StorageState state) {
+      boolean removed = false;
+      Set<MetaPartition> partitionSet = null;
+      if (meta.getProcessingState() == ProcessingState.UNPROCESSED) {
+        partitionSet = unprocessedPartitions.get(state);
+      } else if (meta.getProcessingState() == ProcessingState.PROCESSED) {
+        partitionSet = processedPartitions.get(state);
+      } else {
+        LOG.info("remove: partition " + meta.getPartitionId() + " is " +
+            "already being processed! This should happen only if partition " +
+            "removal is done before start of an iteration over all 
partitions");
+      }
+      if (partitionSet != null) {
+        synchronized (partitionSet) {
+          removed = partitionSet.remove(meta);
+        }
+      }
+      return removed;
+    }
+
+    /**
+     * Add a partition to partition set of a given state
+     *
+     * @param meta meta partition to add
+     * @param state state to which the partition should be added
+     */
+    public void add(MetaPartition meta, StorageState state) {
+      Set<MetaPartition> partitionSet = null;
+      if (meta.getProcessingState() == ProcessingState.UNPROCESSED) {
+        partitionSet = unprocessedPartitions.get(state);
+      } else if (meta.getProcessingState() == ProcessingState.PROCESSED) {
+        partitionSet = processedPartitions.get(state);
+      } else {
+        LOG.info("add: partition " + meta.getPartitionId() + " is already " +
+            "being processed!");
+      }
+      if (partitionSet != null) {
+        synchronized (partitionSet) {
+          partitionSet.add(meta);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/data/OutOfCoreDataManager.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/data/OutOfCoreDataManager.java
 
b/giraph-core/src/main/java/org/apache/giraph/ooc/data/OutOfCoreDataManager.java
new file mode 100644
index 0000000..7d97e51
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/ooc/data/OutOfCoreDataManager.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.data;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.IntConfOption;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.giraph.conf.GiraphConstants.ONE_MB;
+
+/**
+ * This class provides basic operations for data structures that have to
+ * participate in out-of-core mechanism. Essential subclasses of this class 
are:
+ *  - DiskBackedPartitionStore (for partition data)
+ *  - DiskBackedMessageStore (for messages)
+ *  - DiskBackedEdgeStore (for edges read in INPUT_SUPERSTEP)
+ * Basically, any data structure that may cause OOM to happen can be 
implemented
+ * as a subclass of this class.
+ *
+ * There are two different terms used in the rest of this class:
+ *  - "data store" refers to in-memory representation of data. Usually this is
+ *    stored per-partition in in-memory implementations of data structures. For
+ *    instance, "data store" of a DiskBackedPartitionStore would collection of
+ *    all partitions kept in the in-memory partition store within the
+ *    DiskBackedPartitionStore.
+ *  - "raw data buffer" refers to raw data which were supposed to be
+ *    de-serialized and added to the data store, but they remain 'as is' in the
+ *    memory because their corresponding partition is offloaded to disk and is
+ *    not available in the data store.
+ *
+ * @param <T> raw data format of the data store subclassing this class
+ */
+public abstract class OutOfCoreDataManager<T> {
+  /**
+   * Minimum size of a buffer (in bytes) to flush to disk. This is used to
+   * decide whether vertex/edge buffers are large enough to flush to disk.
+   */
+  public static final IntConfOption MINIMUM_BUFFER_SIZE_TO_FLUSH =
+      new IntConfOption("giraph.flushBufferSize", 8 * ONE_MB,
+          "Minimum size of a buffer (in bytes) to flush to disk.");
+
+  /** Class logger. */
+  private static final Logger LOG = Logger.getLogger(
+      OutOfCoreDataManager.class);
+  /** Cached value for MINIMUM_BUFFER_SIZE_TO_FLUSH */
+  private final int minBufferSizeToOffload;
+  /** Set containing ids of all out-of-core partitions */
+  private final Set<Integer> hasPartitionDataOnDisk =
+      Sets.newConcurrentHashSet();
+  /**
+   * Map of partition ids to list of raw data buffers. The map will have 
entries
+   * only for partitions that their in-memory data structures are currently
+   * offloaded to disk. We keep the aggregate size of buffers for each 
partition
+   * as part of the values in the map to estimate how much memory we can free 
up
+   * if we offload data buffers of a particular partition to disk.
+   */
+  private final ConcurrentMap<Integer, Pair<Integer, List<T>>> dataBuffers =
+      Maps.newConcurrentMap();
+  /**
+   * Map of partition ids to number of raw data buffers offloaded to disk for
+   * each partition. The map will have entries only for partitions that their
+   * in-memory data structures are currently out of core. It is necessary to
+   * know the number of data buffers on disk for a particular partition when we
+   * are loading all these buffers back in memory.
+   */
+  private final ConcurrentMap<Integer, Integer> numDataBuffersOnDisk =
+      Maps.newConcurrentMap();
+  /**
+   * Lock to avoid overlapping of read and write on data associated with each
+   * partition.
+   * */
+  private final ConcurrentMap<Integer, ReadWriteLock> locks =
+      Maps.newConcurrentMap();
+
+  /**
+   * Constructor.
+   *
+   * @param conf Configuration
+   */
+  OutOfCoreDataManager(ImmutableClassesGiraphConfiguration conf) {
+    this.minBufferSizeToOffload = MINIMUM_BUFFER_SIZE_TO_FLUSH.get(conf);
+  }
+
+  /**
+   * Retrieves a lock for a given partition. If the lock for the given 
partition
+   * does not exist, creates a new lock.
+   *
+   * @param partitionId id of the partition the lock is needed for
+   * @return lock for a given partition
+   */
+  private ReadWriteLock getPartitionLock(int partitionId) {
+    ReadWriteLock readWriteLock = locks.get(partitionId);
+    if (readWriteLock == null) {
+      readWriteLock = new ReentrantReadWriteLock();
+      ReadWriteLock temp = locks.putIfAbsent(partitionId, readWriteLock);
+      if (temp != null) {
+        readWriteLock = temp;
+      }
+    }
+    return readWriteLock;
+  }
+
+  /**
+   * Adds a data entry for a given partition to the current data store. If data
+   * of a given partition in data store is already offloaded to disk, adds the
+   * data entry to appropriate raw data buffer list.
+   *
+   * @param partitionId id of the partition to add the data entry to
+   * @param entry data entry to add
+   */
+  protected void addEntry(int partitionId, T entry) {
+    // Addition of data entries to a data store is much more common than
+    // out-of-core operations. Besides, in-memory data store implementations
+    // existing in the code base already account for parallel addition to data
+    // stores. Therefore, using read lock would optimize for parallel addition
+    // to data stores, specially for cases where the addition should happen for
+    // partitions that are entirely in memory.
+    ReadWriteLock rwLock = getPartitionLock(partitionId);
+    rwLock.readLock().lock();
+    if (hasPartitionDataOnDisk.contains(partitionId)) {
+      List<T> entryList = new ArrayList<>();
+      entryList.add(entry);
+      int entrySize = entrySerializedSize(entry);
+      MutablePair<Integer, List<T>> newPair =
+          new MutablePair<>(entrySize, entryList);
+      Pair<Integer, List<T>> oldPair =
+          dataBuffers.putIfAbsent(partitionId, newPair);
+      if (oldPair != null) {
+        synchronized (oldPair) {
+          newPair = (MutablePair<Integer, List<T>>) oldPair;
+          newPair.setLeft(oldPair.getLeft() + entrySize);
+          newPair.getRight().add(entry);
+        }
+      }
+    } else {
+      addEntryToImMemoryPartitionData(partitionId, entry);
+    }
+    rwLock.readLock().unlock();
+  }
+
+  /**
+   * Loads and assembles all data for a given partition, and put it into the
+   * data store.
+   *
+   * @param partitionId id of the partition to load ana assemble all data for
+   * @param basePath path to load the data from
+   * @throws IOException
+   */
+  public void loadPartitionData(int partitionId, String basePath)
+      throws IOException {
+    ReadWriteLock rwLock = getPartitionLock(partitionId);
+    rwLock.writeLock().lock();
+    if (hasPartitionDataOnDisk.contains(partitionId)) {
+      loadInMemoryPartitionData(partitionId, getPath(basePath, partitionId));
+      hasPartitionDataOnDisk.remove(partitionId);
+      // Loading raw data buffers from disk if there is any and applying those
+      // to already loaded in-memory data.
+      Integer numBuffers = numDataBuffersOnDisk.remove(partitionId);
+      if (numBuffers != null) {
+        checkState(numBuffers > 0);
+        File file = new File(getBuffersPath(basePath, partitionId));
+        checkState(file.exists());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("loadPartitionData: loading " + numBuffers + " buffers of" 
+
+              " partition " + partitionId + " from " + file.getAbsolutePath());
+        }
+        FileInputStream fis = new FileInputStream(file);
+        BufferedInputStream bis = new BufferedInputStream(fis);
+        DataInputStream dis = new DataInputStream(bis);
+        for (int i = 0; i < numBuffers; ++i) {
+          T entry = readNextEntry(dis);
+          addEntryToImMemoryPartitionData(partitionId, entry);
+        }
+        dis.close();
+        checkState(file.delete(), "loadPartitionData: failed to delete %s.",
+            file.getAbsoluteFile());
+      }
+      // Applying in-memory raw data buffers to in-memory partition data.
+      Pair<Integer, List<T>> pair = dataBuffers.remove(partitionId);
+      if (pair != null) {
+        for (T entry : pair.getValue()) {
+          addEntryToImMemoryPartitionData(partitionId, entry);
+        }
+      }
+    }
+    rwLock.writeLock().unlock();
+  }
+
+  /**
+   * Offloads partition data of a given partition in the data store to disk
+   *
+   * @param partitionId id of the partition to offload its data
+   * @param basePath path to offload the data to
+   * @throws IOException
+   */
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
+      "UL_UNRELEASED_LOCK_EXCEPTION_PATH")
+  public void offloadPartitionData(int partitionId, String basePath)
+      throws IOException {
+    ReadWriteLock rwLock = getPartitionLock(partitionId);
+    rwLock.writeLock().lock();
+    hasPartitionDataOnDisk.add(partitionId);
+    rwLock.writeLock().unlock();
+    offloadInMemoryPartitionData(partitionId, getPath(basePath, partitionId));
+  }
+
+  /**
+   * Offloads raw data buffers of a given partition to disk
+   *
+   * @param partitionId id of the partition to offload its raw data buffers
+   * @param basePath path to offload the data to
+   * @throws IOException
+   */
+  public void offloadBuffers(int partitionId, String basePath)
+      throws IOException {
+    Pair<Integer, List<T>> pair = dataBuffers.get(partitionId);
+    if (pair == null || pair.getLeft() < minBufferSizeToOffload) {
+      return;
+    }
+    ReadWriteLock rwLock = getPartitionLock(partitionId);
+    rwLock.writeLock().lock();
+    pair = dataBuffers.remove(partitionId);
+    rwLock.writeLock().unlock();
+    checkNotNull(pair);
+    checkState(!pair.getRight().isEmpty());
+    File file = new File(getBuffersPath(basePath, partitionId));
+    FileOutputStream fos = new FileOutputStream(file, true);
+    BufferedOutputStream bos = new BufferedOutputStream(fos);
+    DataOutputStream dos = new DataOutputStream(bos);
+    for (T entry : pair.getRight()) {
+      writeEntry(entry, dos);
+    }
+    dos.close();
+    int numBuffers = pair.getRight().size();
+    Integer oldNumBuffersOnDisk =
+        numDataBuffersOnDisk.putIfAbsent(partitionId, numBuffers);
+    if (oldNumBuffersOnDisk != null) {
+      numDataBuffersOnDisk.replace(partitionId,
+          oldNumBuffersOnDisk + numBuffers);
+    }
+  }
+
+  /**
+   * Looks through all partitions that their data is not in the data store (is
+   * offloaded to disk), and sees if any of them has enough raw data buffer in
+   * memory. If so, puts that partition in a list to return.
+   *
+   * @return Set of partition ids of all partition raw buffers where the
+   *         aggregate size of buffers are large enough and it is worth 
flushing
+   *         those buffers to disk
+   */
+  public Set<Integer> getCandidateBuffersToOffload() {
+    Set<Integer> result = new HashSet<>();
+    for (Map.Entry<Integer, Pair<Integer, List<T>>> entry :
+        dataBuffers.entrySet()) {
+      if (entry.getValue().getLeft() > minBufferSizeToOffload) {
+        result.add(entry.getKey());
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Creates the path to read/write partition data from/to for a given
+   * partition.
+   *
+   * @param basePath path prefix to create the actual path from
+   * @param partitionId id of the partition
+   * @return path to read/write data from/to
+   */
+  private static String getPath(String basePath, int partitionId) {
+    return basePath + "-P" + partitionId;
+  }
+
+  /**
+   * Creates the path to read/write raw data buffers of a given partition
+   * from/to.
+   *
+   * @param basePath path prefix to create the actual path from
+   * @param partitionId id of the partition
+   * @return path to read/write raw data buffer to/from
+   */
+  private static String getBuffersPath(String basePath, int partitionId) {
+    return getPath(basePath, partitionId) + "_buffers";
+  }
+
+  /**
+   * Writes a single raw entry to a given output stream.
+   *
+   * @param entry entry to write to output
+   * @param out output stream to write the entry to
+   * @throws IOException
+   */
+  protected abstract void writeEntry(T entry, DataOutput out)
+      throws IOException;
+
+  /**
+   * Reads the next available raw entry from a given input stream.
+   *
+   * @param in input stream to read the entry from
+   * @return entry read from an input stream
+   * @throws IOException
+   */
+  protected abstract T readNextEntry(DataInput in) throws IOException;
+
+  /**
+   * Loads data of a partition into data store.
+   *
+   * @param partitionId id of the partition to load its data
+   * @param path path from which data should be loaded
+   * @throws IOException
+   */
+  protected abstract void loadInMemoryPartitionData(int partitionId,
+                                                    String path)
+      throws IOException;
+
+  /**
+   * Offloads data of a partition in data store to disk.
+   *
+   * @param partitionId id of the partition to offload to disk
+   * @param path path to which data should be offloaded
+   * @throws IOException
+   */
+  protected abstract void offloadInMemoryPartitionData(int partitionId,
+                                                       String path)
+      throws IOException;
+
+  /**
+   * Gets the size of a given entry in bytes.
+   *
+   * @param entry input entry to find its size
+   * @return size of given input entry in bytes
+   */
+  protected abstract int entrySerializedSize(T entry);
+
+  /**
+   * Adds a single entry for a given partition to the in-memory data store.
+   *
+   * @param partitionId id of the partition to add the data to
+   * @param entry input entry to add to the data store
+   */
+  protected abstract void addEntryToImMemoryPartitionData(int partitionId,
+                                                          T entry);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/data/package-info.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/data/package-info.java 
b/giraph-core/src/main/java/org/apache/giraph/ooc/data/package-info.java
new file mode 100644
index 0000000..5260f2c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of classes related to data structures used for an out-of-core
+ * mechanism
+ */
+package org.apache.giraph.ooc.data;

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/io/IOCommand.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/IOCommand.java 
b/giraph-core/src/main/java/org/apache/giraph/ooc/io/IOCommand.java
new file mode 100644
index 0000000..eb6d2c9
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/io/IOCommand.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.io;
+
+import org.apache.giraph.ooc.OutOfCoreEngine;
+
+import java.io.IOException;
+
+/**
+ * Representation of an IO command (moving data to disk/memory) used in
+ * out-of-core mechanism.
+ */
+public abstract class IOCommand {
+  /** Id of the partition involved for the IO */
+  protected final int partitionId;
+  /** Out-of-core engine */
+  protected final OutOfCoreEngine oocEngine;
+
+  /**
+   * Constructor
+   *
+   * @param oocEngine Out-of-core engine
+   * @param partitionId Id of the partition involved in the IO
+   */
+  public IOCommand(OutOfCoreEngine oocEngine, int partitionId) {
+    this.oocEngine = oocEngine;
+    this.partitionId = partitionId;
+  }
+
+  /**
+   * GEt the id of the partition involved in the IO
+   *
+   * @return id of the partition
+   */
+  public int getPartitionId() {
+    return partitionId;
+  }
+
+  /**
+   * Execute (load/store of data) the IO command, and change the data stores
+   * appropriately based on the data loaded/stored.
+   *
+   * @param basePath the base path (prefix) to the files/folders IO command
+   *                 should read/write data from/to
+   * @throws IOException
+   */
+  public abstract void execute(String basePath) throws IOException;
+}
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/io/LoadPartitionIOCommand.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/io/LoadPartitionIOCommand.java
 
b/giraph-core/src/main/java/org/apache/giraph/ooc/io/LoadPartitionIOCommand.java
new file mode 100644
index 0000000..c28a0da
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/ooc/io/LoadPartitionIOCommand.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.giraph.bsp.BspService;
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.ooc.OutOfCoreEngine;
+import org.apache.giraph.ooc.data.DiskBackedEdgeStore;
+import org.apache.giraph.ooc.data.DiskBackedMessageStore;
+import org.apache.giraph.ooc.data.DiskBackedPartitionStore;
+
+import java.io.IOException;
+
+/**
+ * IOCommand to load partition data, edge data (if in INPUT_SUPERSTEP), and
+ * message data (if in compute supersteps). Also, this command can be used to
+ * prefetch a partition to be processed in the next superstep.
+ */
+public class LoadPartitionIOCommand extends IOCommand {
+  /**
+   * Which superstep this partition should be loaded for? (can be current
+   * superstep or next superstep -- in case of prefetching).
+   */
+  private final long superstep;
+
+  /**
+   * Constructor
+   *
+   * @param oocEngine out-of-core engine
+   * @param partitionId id of the partition to be loaded
+   * @param superstep superstep to load the partition for
+   */
+  public LoadPartitionIOCommand(OutOfCoreEngine oocEngine, int partitionId,
+                                long superstep) {
+    super(oocEngine, partitionId);
+    this.superstep = superstep;
+  }
+
+  @Override
+  public void execute(String basePath) throws IOException {
+    if (oocEngine.getMetaPartitionManager()
+        .startLoadingPartition(partitionId, superstep)) {
+      long currentSuperstep = oocEngine.getServiceWorker().getSuperstep();
+      DiskBackedPartitionStore partitionStore =
+          (DiskBackedPartitionStore)
+              oocEngine.getServerData().getPartitionStore();
+      partitionStore.loadPartitionData(partitionId, basePath);
+      if (currentSuperstep == BspService.INPUT_SUPERSTEP &&
+          superstep == currentSuperstep) {
+        DiskBackedEdgeStore edgeStore =
+            (DiskBackedEdgeStore) oocEngine.getServerData().getEdgeStore();
+        edgeStore.loadPartitionData(partitionId, basePath);
+      }
+      MessageStore messageStore;
+      if (currentSuperstep == superstep) {
+        messageStore = oocEngine.getServerData().getCurrentMessageStore();
+      } else {
+        Preconditions.checkState(superstep == currentSuperstep + 1);
+        messageStore = oocEngine.getServerData().getIncomingMessageStore();
+      }
+      if (messageStore != null) {
+        ((DiskBackedMessageStore) messageStore)
+            .loadPartitionData(partitionId, basePath);
+      }
+      oocEngine.getMetaPartitionManager()
+          .doneLoadingPartition(partitionId, superstep);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "LoadPartitionIOCommand: (partitionId = " + partitionId + ", " +
+        "superstep = " + superstep + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreDataBufferIOCommand.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreDataBufferIOCommand.java
 
b/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreDataBufferIOCommand.java
new file mode 100644
index 0000000..41a0682
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreDataBufferIOCommand.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.io;
+
+import org.apache.giraph.ooc.OutOfCoreEngine;
+import org.apache.giraph.ooc.data.DiskBackedEdgeStore;
+import org.apache.giraph.ooc.data.DiskBackedMessageStore;
+import org.apache.giraph.ooc.data.DiskBackedPartitionStore;
+
+import java.io.IOException;
+
+/**
+ * IOCommand to store raw data buffers on disk.
+ */
+public class StoreDataBufferIOCommand extends IOCommand {
+  /**
+   * Types of raw data buffer to offload to disk (either vertices/edges buffer
+   * in INPUT_SUPERSTEP or incoming message buffer).
+   */
+  public enum DataBufferType { PARTITION, MESSAGE };
+  /**
+   * Type of the buffer to store on disk.
+   */
+  private final DataBufferType type;
+
+  /**
+   * Constructor
+   *
+   * @param oocEngine out-of-core engine
+   * @param partitionId id of the partition to offload its buffers
+   * @param type type of the buffer to store on disk
+   */
+  public StoreDataBufferIOCommand(OutOfCoreEngine oocEngine,
+                                  int partitionId,
+                                  DataBufferType type) {
+    super(oocEngine, partitionId);
+    this.type = type;
+  }
+
+  @Override
+  public void execute(String basePath) throws IOException {
+    if (oocEngine.getMetaPartitionManager()
+        .startOffloadingBuffer(partitionId)) {
+      switch (type) {
+      case PARTITION:
+        DiskBackedPartitionStore partitionStore =
+            (DiskBackedPartitionStore)
+                oocEngine.getServerData().getPartitionStore();
+        partitionStore.offloadBuffers(partitionId, basePath);
+        DiskBackedEdgeStore edgeStore =
+            (DiskBackedEdgeStore) oocEngine.getServerData().getEdgeStore();
+        edgeStore.offloadBuffers(partitionId, basePath);
+        break;
+      case MESSAGE:
+        DiskBackedMessageStore messageStore =
+            (DiskBackedMessageStore)
+                oocEngine.getServerData().getIncomingMessageStore();
+        messageStore.offloadBuffers(partitionId, basePath);
+        break;
+      default:
+        throw new IllegalStateException("execute: requested data buffer type " 
+
+            "does not exist!");
+      }
+      oocEngine.getMetaPartitionManager().doneOffloadingBuffer(partitionId);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "StoreDataBufferIOCommand: (partitionId = " + partitionId + ", " +
+        "type = " + type.name() + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreIncomingMessageIOCommand.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreIncomingMessageIOCommand.java
 
b/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreIncomingMessageIOCommand.java
new file mode 100644
index 0000000..9c1c0a2
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreIncomingMessageIOCommand.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.io;
+
+import org.apache.giraph.ooc.OutOfCoreEngine;
+import org.apache.giraph.ooc.data.DiskBackedMessageStore;
+
+import java.io.IOException;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * IOCommand to store incoming message of a particular partition.
+ */
+public class StoreIncomingMessageIOCommand extends IOCommand {
+  /**
+   * Constructor
+   *
+   * @param oocEngine out-of-core engine
+   * @param partitionId id of the partition to store its incoming messages
+   */
+  public StoreIncomingMessageIOCommand(OutOfCoreEngine oocEngine,
+                                       int partitionId) {
+    super(oocEngine, partitionId);
+  }
+
+  @Override
+  public void execute(String basePath) throws IOException {
+    if (oocEngine.getMetaPartitionManager()
+        .startOffloadingMessages(partitionId)) {
+      DiskBackedMessageStore messageStore =
+          (DiskBackedMessageStore)
+              oocEngine.getServerData().getIncomingMessageStore();
+      checkState(messageStore != null);
+      messageStore.offloadPartitionData(partitionId, basePath);
+      oocEngine.getMetaPartitionManager().doneOffloadingMessages(partitionId);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "StoreIncomingMessageIOCommand: (partitionId = " + partitionId + 
")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/io/StorePartitionIOCommand.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StorePartitionIOCommand.java
 
b/giraph-core/src/main/java/org/apache/giraph/ooc/io/StorePartitionIOCommand.java
new file mode 100644
index 0000000..77955dc
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/ooc/io/StorePartitionIOCommand.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.io;
+
+import org.apache.giraph.bsp.BspService;
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.ooc.data.DiskBackedEdgeStore;
+import org.apache.giraph.ooc.data.DiskBackedMessageStore;
+import org.apache.giraph.ooc.data.DiskBackedPartitionStore;
+import org.apache.giraph.ooc.OutOfCoreEngine;
+
+import java.io.IOException;
+
+/**
+ * IOCommand to store partition data, edge data (if in INPUT_SUPERSTEP), and
+ * message data (if in compute supersteps).
+ */
+public class StorePartitionIOCommand extends IOCommand {
+  /**
+   * Constructor
+   *
+   * @param oocEngine out-of-core engine
+   * @param partitionId id of the partition to store its data
+   */
+  public StorePartitionIOCommand(OutOfCoreEngine oocEngine,
+                                 int partitionId) {
+    super(oocEngine, partitionId);
+  }
+
+  @Override
+  public void execute(String basePath) throws IOException {
+    if (oocEngine.getMetaPartitionManager()
+        .startOffloadingPartition(partitionId)) {
+      DiskBackedPartitionStore partitionStore =
+          (DiskBackedPartitionStore)
+              oocEngine.getServerData().getPartitionStore();
+      partitionStore.offloadPartitionData(partitionId, basePath);
+      if (oocEngine.getServiceWorker().getSuperstep() !=
+          BspService.INPUT_SUPERSTEP) {
+        MessageStore messageStore =
+            oocEngine.getServerData().getCurrentMessageStore();
+        if (messageStore != null) {
+          ((DiskBackedMessageStore) messageStore)
+              .offloadPartitionData(partitionId, basePath);
+        }
+      } else {
+        DiskBackedEdgeStore edgeStore =
+            (DiskBackedEdgeStore)
+                oocEngine.getServerData().getEdgeStore();
+        edgeStore.offloadPartitionData(partitionId, basePath);
+      }
+      oocEngine.getMetaPartitionManager().doneOffloadingPartition(partitionId);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "StorePartitionIOCommand: (partitionId = " + partitionId + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/io/WaitIOCommand.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/io/WaitIOCommand.java 
b/giraph-core/src/main/java/org/apache/giraph/ooc/io/WaitIOCommand.java
new file mode 100644
index 0000000..b6e0546
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/io/WaitIOCommand.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc.io;
+
+import org.apache.giraph.ooc.OutOfCoreEngine;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * IOCommand to do nothing regarding moving data to/from disk.
+ */
+public class WaitIOCommand extends IOCommand {
+  /** How long should the disk be idle? (in milliseconds) */
+  private final long waitDuration;
+
+  /**
+   * Constructor
+   *
+   * @param oocEngine out-of-core engine
+   * @param waitDuration duration of wait
+   */
+  public WaitIOCommand(OutOfCoreEngine oocEngine, long waitDuration) {
+    super(oocEngine, -1);
+    this.waitDuration = waitDuration;
+  }
+
+  @Override
+  public void execute(String basePath) throws IOException {
+    try {
+      TimeUnit.MILLISECONDS.sleep(waitDuration);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("execute: caught InterruptedException " +
+          "while IO thread is waiting!");
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "WaitIOCommand: (duration = " + waitDuration + "ms)";
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/io/package-info.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/ooc/io/package-info.java 
b/giraph-core/src/main/java/org/apache/giraph/ooc/io/package-info.java
new file mode 100644
index 0000000..2230ec4
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/io/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of classes related to IO operations in out-of-core mechanism
+ */
+package org.apache.giraph.ooc.io;

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/partition/PartitionData.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionData.java 
b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionData.java
deleted file mode 100644
index d6e3a70..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionData.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.partition;
-
-import org.apache.giraph.utils.ExtendedDataOutput;
-import org.apache.giraph.utils.VertexIdEdges;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Structure that keeps partition information.
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- */
-public interface PartitionData<I extends WritableComparable,
-    V extends Writable, E extends Writable> {
-  /**
-   * Add a *new* partition to the store. If the partition is already existed,
-   * it does not add the partition and returns false.
-   *
-   * @param partition Partition to add
-   * @return Whether the addition made any change in the partition store
-   */
-  boolean addPartition(Partition<I, V, E> partition);
-
-  /**
-   * Remove a partition and return it. Called from a single thread, *not* from
-   * within a scheduling cycle, and after INPUT_SUPERSTEP is complete.
-   *
-   * @param partitionId Partition id
-   * @return The removed partition
-   */
-  Partition<I, V, E> removePartition(Integer partitionId);
-
-  /**
-   * Whether a specific partition is present in the store.
-   *
-   * @param partitionId Partition id
-   * @return True iff the partition is present
-   */
-  boolean hasPartition(Integer partitionId);
-
-  /**
-   * Return the ids of all the stored partitions as an Iterable.
-   *
-   * @return The partition ids
-   */
-  Iterable<Integer> getPartitionIds();
-
-  /**
-   * Return the number of stored partitions.
-   *
-   * @return The number of partitions
-   */
-  int getNumPartitions();
-
-  /**
-   * Return the number of vertices in a partition.
-   *
-   * @param partitionId Partition id
-   * @return The number of vertices in the specified partition
-   */
-  long getPartitionVertexCount(Integer partitionId);
-
-  /**
-   * Return the number of edges in a partition.
-   *
-   * @param partitionId Partition id
-   * @return The number of edges in the specified partition
-   */
-  long getPartitionEdgeCount(Integer partitionId);
-
-  /**
-   * Whether the partition store is empty.
-   *
-   * @return True iff there are no partitions in the store
-   */
-  boolean isEmpty();
-
-  /**
-   * Add vertices to a given partition from a given DataOutput instance. This
-   * method is called right after receipt of vertex request in INPUT_SUPERSTEP.
-   *
-   * @param partitionId Partition id
-   * @param extendedDataOutput Output containing serialized vertex data
-   */
-  void addPartitionVertices(Integer partitionId,
-      ExtendedDataOutput extendedDataOutput);
-
-  /**
-   * Add edges to a given partition from a given send edge request. This
-   * method is called right after receipt of edge request in INPUT_SUPERSTEP.
-   *
-   * @param partitionId Partition id
-   * @param edges Edges in the request
-   */
-  void addPartitionEdges(Integer partitionId, VertexIdEdges<I, E> edges);
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java 
b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
index 2facff8..5ff15eb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
@@ -18,152 +18,106 @@
 
 package org.apache.giraph.partition;
 
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.messages.MessageData;
-import org.apache.giraph.comm.messages.MessageStore;
-import org.apache.giraph.comm.messages.MessageStoreFactory;
-import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.edge.EdgeStore;
-import org.apache.giraph.edge.EdgeStoreFactory;
-import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.utils.ExtendedDataOutput;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
-
-import java.io.IOException;
-
-import static 
org.apache.giraph.conf.GiraphConstants.MESSAGE_STORE_FACTORY_CLASS;
 
 /**
  * Structure that stores partitions for a worker. PartitionStore does not allow
  * random accesses to partitions except upon removal.
- * This structure is thread-safe.
+ * This structure is thread-safe, unless otherwise specified.
  *
  * @param <I> Vertex id
  * @param <V> Vertex data
  * @param <E> Edge data
  */
-public abstract class PartitionStore<I extends WritableComparable,
-    V extends Writable, E extends Writable>
-    implements PartitionData<I, V, E>, MessageData<I> {
-  /** Configuration. */
-  protected final ImmutableClassesGiraphConfiguration<I, V, E> conf;
-  /** Context used to report progress */
-  protected final Mapper<?, ?, ?, ?>.Context context;
-  /** service worker reference */
-  protected final CentralizedServiceWorker<I, V, E> serviceWorker;
-
-  /** Edge store for this worker */
-  protected final EdgeStore<I, V, E> edgeStore;
-
-  /** Message store factory */
-  protected MessageStoreFactory<I, Writable, MessageStore<I, Writable>>
-      messageStoreFactory;
+public interface PartitionStore<I extends WritableComparable,
+    V extends Writable, E extends Writable> {
+  /**
+   * Add a *new* partition to the store. If the partition is already existed,
+   * it does not add the partition and returns false.
+   * Note: this method is not thread-safe and should be called by a single
+   * thread.
+   *
+   * @param partition Partition to add
+   * @return Whether the addition made any change in the partition store
+   */
+  boolean addPartition(Partition<I, V, E> partition);
+
+  /**
+   * Remove a partition and return it. Called from a single thread, *not* from
+   * within a scheduling cycle. This method should *not* be called in
+   * INPUT_SUPERSTEP.
+   *
+   * @param partitionId Partition id
+   * @return The removed partition
+   */
+  Partition<I, V, E> removePartition(Integer partitionId);
+
   /**
-   * Message store for incoming messages (messages which will be consumed
-   * in the next super step)
+   * Whether a specific partition is present in the store.
+   *
+   * @param partitionId Partition id
+   * @return True iff the partition is present
    */
-  protected volatile MessageStore<I, Writable> incomingMessageStore;
+  boolean hasPartition(Integer partitionId);
+
   /**
-   * Message store for current messages (messages which we received in
-   * previous super step and which will be consumed in current super step)
+   * Return the ids of all the stored partitions as an Iterable.
+   *
+   * @return The partition ids
    */
-  protected volatile MessageStore<I, Writable> currentMessageStore;
+  Iterable<Integer> getPartitionIds();
 
   /**
-   * Constructor for abstract partition store
+   * Return the number of stored partitions.
    *
-   * @param conf Job configuration
-   * @param context Mapper context
-   * @param serviceWorker Worker service
+   * @return The number of partitions
    */
-  public PartitionStore(ImmutableClassesGiraphConfiguration<I, V, E> conf,
-      Mapper<?, ?, ?, ?>.Context context,
-      CentralizedServiceWorker<I, V, E> serviceWorker) {
-    this.conf = conf;
-    this.context = context;
-    this.serviceWorker = serviceWorker;
-    this.messageStoreFactory = createMessageStoreFactory();
-    EdgeStoreFactory<I, V, E> edgeStoreFactory = conf.createEdgeStoreFactory();
-    edgeStoreFactory.initialize(serviceWorker, conf, context);
-    this.edgeStore = edgeStoreFactory.newStore();
-  }
+  int getNumPartitions();
 
   /**
-   * Decide which message store should be used for current application,
-   * and create the factory for that store
+   * Return the number of vertices in a partition.
    *
-   * @return Message store factory
+   * @param partitionId Partition id
+   * @return The number of vertices in the specified partition
    */
-  private MessageStoreFactory<I, Writable, MessageStore<I, Writable>>
-  createMessageStoreFactory() {
-    Class<? extends MessageStoreFactory> messageStoreFactoryClass =
-        MESSAGE_STORE_FACTORY_CLASS.get(conf);
-
-    MessageStoreFactory messageStoreFactoryInstance =
-        ReflectionUtils.newInstance(messageStoreFactoryClass);
-    messageStoreFactoryInstance.initialize(serviceWorker, conf);
-
-    return messageStoreFactoryInstance;
-  }
-
-  @Override
-  public boolean isEmpty() {
-    return getNumPartitions() == 0;
-  }
-
-  @Override
-  public <M extends Writable> MessageStore<I, M> getIncomingMessageStore() {
-    return (MessageStore<I, M>) incomingMessageStore;
-  }
-
-  @Override
-  public <M extends Writable> MessageStore<I, M> getCurrentMessageStore() {
-    return (MessageStore<I, M>) currentMessageStore;
-  }
-
-  @Override
-  public void resetMessageStores() throws IOException {
-    if (currentMessageStore != null) {
-      currentMessageStore.clearAll();
-      currentMessageStore = null;
-    }
-    if (incomingMessageStore != null) {
-      incomingMessageStore.clearAll();
-      incomingMessageStore = null;
-    }
-    prepareSuperstep();
-  }
-
-  /** Prepare for next super step */
-  public void prepareSuperstep() {
-    if (currentMessageStore != null) {
-      try {
-        currentMessageStore.clearAll();
-      } catch (IOException e) {
-        throw new IllegalStateException(
-            "Failed to clear previous message store");
-      }
-    }
-    currentMessageStore = incomingMessageStore != null ?
-        incomingMessageStore :
-        messageStoreFactory.newStore(conf.getIncomingMessageClasses());
-    incomingMessageStore =
-        messageStoreFactory.newStore(conf.getOutgoingMessageClasses());
-    // finalize current message-store before resolving mutations
-    currentMessageStore.finalizeStore();
-  }
+  long getPartitionVertexCount(Integer partitionId);
+
+  /**
+   * Return the number of edges in a partition.
+   *
+   * @param partitionId Partition id
+   * @return The number of edges in the specified partition
+   */
+  long getPartitionEdgeCount(Integer partitionId);
+
+  /**
+   * Whether the partition store is empty.
+   *
+   * @return True iff there are no partitions in the store
+   */
+  boolean isEmpty();
+
+  /**
+   * Add vertices to a given partition from a given DataOutput instance. This
+   * method is called right after receipt of vertex request in INPUT_SUPERSTEP.
+   *
+   * @param partitionId Partition id
+   * @param extendedDataOutput Output containing serialized vertex data
+   */
+  void addPartitionVertices(Integer partitionId,
+                            ExtendedDataOutput extendedDataOutput);
 
   /**
    * Called at the end of the computation. Called from a single thread.
    */
-  public void shutdown() { }
+  void shutdown();
 
   /**
    * Called at the beginning of the computation. Called from a single thread.
    */
-  public void initialize() { }
+  void initialize();
 
   /**
    * Start the iteration cycle to iterate over partitions. Note that each
@@ -194,7 +148,7 @@ public abstract class PartitionStore<I extends 
WritableComparable,
    *
    * Called from a single thread.
    */
-  public abstract void startIteration();
+  void startIteration();
 
   /**
    * Return the next partition in iteration for the current superstep.
@@ -204,7 +158,7 @@ public abstract class PartitionStore<I extends 
WritableComparable,
    *
    * @return The next partition to process
    */
-  public abstract Partition<I, V, E> getNextPartition();
+  Partition<I, V, E> getNextPartition();
 
   /**
    * Put a partition back to the store. Use this method to put a partition
@@ -213,21 +167,5 @@ public abstract class PartitionStore<I extends 
WritableComparable,
    *
    * @param partition Partition
    */
-  public abstract void putPartition(Partition<I, V, E> partition);
-
-  /**
-   * Move edges from edge store to partitions. This method is called from a
-   * *single thread* once all vertices and edges are read in INPUT_SUPERSTEP.
-   */
-  public abstract void moveEdgesToVertices();
-
-  /**
-   * In case of async message store we have to wait for all messages
-   * to be processed before going into next superstep.
-   */
-  public void waitForComplete() {
-    if (incomingMessageStore instanceof AsyncMessageStoreWrapper) {
-      ((AsyncMessageStoreWrapper) incomingMessageStore).waitToComplete();
-    }
-  }
+  void putPartition(Partition<I, V, E> partition);
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
 
b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
index 9f0c408..4846702 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
@@ -19,18 +19,13 @@
 package org.apache.giraph.partition;
 
 import com.google.common.collect.Maps;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.utils.ExtendedDataOutput;
-import org.apache.giraph.utils.VertexIdEdges;
-import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.giraph.utils.VertexIterator;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 
-import java.io.IOException;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentMap;
@@ -46,7 +41,11 @@ import static 
com.google.common.base.Preconditions.checkState;
  */
 public class SimplePartitionStore<I extends WritableComparable,
     V extends Writable, E extends Writable>
-    extends PartitionStore<I, V, E> {
+    implements PartitionStore<I, V, E> {
+  /** Configuration */
+  private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
+  /** Job context (for progress) */
+  private final Mapper<?, ?, ?, ?>.Context context;
   /** Map of stored partitions. */
   private final ConcurrentMap<Integer, Partition<I, V, E>> partitions =
       Maps.newConcurrentMap();
@@ -57,12 +56,11 @@ public class SimplePartitionStore<I extends 
WritableComparable,
    * Constructor.
    * @param conf Configuration
    * @param context Mapper context
-   * @param serviceWorker Service worker
    */
   public SimplePartitionStore(ImmutableClassesGiraphConfiguration<I, V, E> 
conf,
-      Mapper<?, ?, ?, ?>.Context context,
-      CentralizedServiceWorker<I, V, E> serviceWorker) {
-    super(conf, context, serviceWorker);
+      Mapper<?, ?, ?, ?>.Context context) {
+    this.conf = conf;
+    this.context = context;
   }
 
   @Override
@@ -111,6 +109,11 @@ public class SimplePartitionStore<I extends 
WritableComparable,
   }
 
   @Override
+  public boolean isEmpty() {
+    return partitions.size() == 0;
+  }
+
+  @Override
   public void startIteration() {
     checkState(partitionQueue == null || partitionQueue.isEmpty(),
         "startIteration: It seems that some of " +
@@ -162,27 +165,8 @@ public class SimplePartitionStore<I extends 
WritableComparable,
   }
 
   @Override
-  public void addPartitionEdges(Integer partitionId,
-      VertexIdEdges<I, E> edges) {
-    edgeStore.addPartitionEdges(partitionId, edges);
-  }
+  public void shutdown() { }
 
   @Override
-  public void moveEdgesToVertices() {
-    edgeStore.moveEdgesToVertices();
-  }
-
-  @Override
-  public <M extends Writable> void addPartitionCurrentMessages(
-      int partitionId, VertexIdMessages<I, M> messages) throws IOException {
-    ((MessageStore<I, M>) currentMessageStore)
-        .addPartitionMessages(partitionId, messages);
-  }
-
-  @Override
-  public <M extends Writable> void addPartitionIncomingMessages(
-      int partitionId, VertexIdMessages<I, M> messages) throws IOException {
-    ((MessageStore<I, M>) incomingMessageStore)
-        .addPartitionMessages(partitionId, messages);
-  }
+  public void initialize() { }
 }

Reply via email to