http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/ooc/DiskBackedPartitionStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/ooc/DiskBackedPartitionStore.java new file mode 100644 index 0000000..b8a2dd5 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/DiskBackedPartitionStore.java @@ -0,0 +1,1769 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.conf.IntConfOption; +import org.apache.giraph.edge.EdgeStore; +import org.apache.giraph.edge.EdgeStoreFactory; +import org.apache.giraph.edge.OutEdges; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.partition.Partition; +import org.apache.giraph.partition.PartitionStore; +import org.apache.giraph.utils.ByteArrayVertexIdEdges; +import org.apache.giraph.utils.ExtendedDataOutput; +import org.apache.giraph.utils.PairList; +import org.apache.giraph.utils.VertexIdEdges; +import org.apache.giraph.utils.VertexIterator; +import org.apache.giraph.utils.WritableUtils; +import org.apache.giraph.worker.BspServiceWorker; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Mapper.Context; +import org.apache.log4j.Logger; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.apache.giraph.conf.GiraphConstants.MAX_PARTITIONS_IN_MEMORY; +import static org.apache.giraph.conf.GiraphConstants.ONE_MB; +import static org.apache.giraph.conf.GiraphConstants.PARTITIONS_DIRECTORY; + +/** + * Disk-backed PartitionStore. An instance of this class can be coupled with an + * out-of-core engine. Out-of-core engine is responsible to determine when to + * offload and what to offload to disk. The instance of this class handles the + * interactions with disk. + * + * This class provides efficient scheduling mechanism while iterating over + * partitions. It prefers spilling in-memory processed partitions, but the + * scheduling can be improved upon further. + * + * @param <I> Vertex id + * @param <V> Vertex data + * @param <E> Edge data + */ +@SuppressWarnings("rawtypes") +public class DiskBackedPartitionStore<I extends WritableComparable, + V extends Writable, E extends Writable> + extends PartitionStore<I, V, E> { + /** + * Minimum size of a buffer (in bytes) to flush to disk. This is used to + * decide whether vertex/edge buffers are large enough to flush to disk. + */ + public static final IntConfOption MINIMUM_BUFFER_SIZE_TO_FLUSH = + new IntConfOption("giraph.flushBufferSize", 8 * ONE_MB, + "Minimum size of a buffer (in bytes) to flush to disk. "); + + /** Class logger. */ + private static final Logger LOG = + Logger.getLogger(DiskBackedPartitionStore.class); + + /** Cached value for MINIMUM_BUFFER_SIZE_TO_FLUSH */ + private final int minBuffSize; + /** + * States the partition can be found in: + * INIT: the partition has just been created + * ACTIVE: there is at least one thread who holds a reference to the partition + * and uses it + * INACTIVE: the partition is not being used by anyone, but it is in memory + * IN_TRANSIT: the partition is being transferred to disk, the transfer is + * not yet complete + * ON_DISK: the partition resides on disk + */ + private enum State { INIT, ACTIVE, INACTIVE, IN_TRANSIT, ON_DISK }; + + /** Hash map containing all the partitions */ + private final ConcurrentMap<Integer, MetaPartition> partitions = + Maps.newConcurrentMap(); + + /** + * Contains partitions that has been processed in the current iteration cycle, + * and are not in use by any thread. The 'State' of these partitions can only + * be INACTIVE, IN_TRANSIT, and ON_DISK. + */ + private final Map<State, Set<Integer>> processedPartitions; + /** + * Contains partitions that has *not* been processed in the current iteration + * cycle. Similar to processedPartitions, 'State' if these partitions can only + * be INACTIVE, IN_TRANSIT, and ON_DISK. + */ + private final Map<State, Set<Integer>> unProcessedPartitions; + + /** + * Read/Write lock to avoid interleaving of the process of starting a new + * iteration cycle and the process of spilling data to disk. This is necessary + * as starting a new iteration changes the data structure holding data that is + * being spilled to disk. Spilling of different data can happen at the same + * time (a read lock used for spilling), and cannot be overlapped with + * change of data structure holding the data. + */ + private ReadWriteLock rwLock = new ReentrantReadWriteLock(); + + /** Giraph configuration */ + private final + ImmutableClassesGiraphConfiguration<I, V, E> conf; + /** Mapper context */ + private final Context context; + /** Base path where the partition files are written to */ + private final String[] basePaths; + /** Used to hash partition Ids */ + private final HashFunction hasher = Hashing.murmur3_32(); + /** Maximum number of partition slots in memory */ + private final AtomicInteger maxPartitionsInMem = new AtomicInteger(-1); + /** Number of slots used */ + private final AtomicInteger numPartitionsInMem = new AtomicInteger(0); + /** service worker reference */ + private CentralizedServiceWorker<I, V, E> serviceWorker; + + /** Out-of-core engine */ + private final OutOfCoreEngine oocEngine; + /** Edge store for this worker */ + private final EdgeStore<I, V, E> edgeStore; + /** If moving of edges to vertices in INPUT_SUPERSTEP has been started */ + private volatile boolean movingEdges; + /** Whether the partition store is initialized */ + private volatile AtomicBoolean isInitialized; + /** Whether the number of partitions are fixed as requested by user */ + private final boolean isNumPartitionsFixed; + + /** + * Map of partition ids to list of input vertex buffers. The map will have an + * entry only for partitions that are currently out-of-core. We keep the + * aggregate size of buffers in as part of the values of the map to estimate + * how much memory would be free if we offload this buffer to disk. + */ + private final ConcurrentMap<Integer, Pair<Integer, List<ExtendedDataOutput>>> + pendingInputVertices = Maps.newConcurrentMap(); + /** + * When a partition is out-of-core, and we also offloaded some of its vertex + * buffers, we have to keep track of how many buffers we offloaded to disk. + * This contains this value for out-of-core partitions. + */ + private final ConcurrentMap<Integer, Integer> numPendingInputVerticesOnDisk = + Maps.newConcurrentMap(); + /** Lock to avoid overlap of addition and removal on pendingInputVertices */ + private ReadWriteLock vertexBufferRWLock = new ReentrantReadWriteLock(); + + /** + * Similar to vertex buffer, but used for input edges (see comments for + * pendingInputVertices). + */ + private final ConcurrentMap<Integer, Pair<Integer, List<VertexIdEdges<I, E>>>> + pendingInputEdges = Maps.newConcurrentMap(); + /** Similar to numPendingInputVerticesOnDisk but used for edge buffers */ + private final ConcurrentMap<Integer, Integer> numPendingInputEdgesOnDisk = + Maps.newConcurrentMap(); + /** Lock to avoid overlap of addition and removal on pendingInputEdges */ + private ReadWriteLock edgeBufferRWLock = new ReentrantReadWriteLock(); + + /** + * For each out-of-core partitions, whether its edge store is also + * offloaded to disk in INPUT_SUPERSTEP. + */ + private final ConcurrentMap<Integer, Boolean> hasEdgeStoreOnDisk = + Maps.newConcurrentMap(); + + /** + * Constructor + * + * @param conf Configuration + * @param context Context + * @param serviceWorker service worker reference + */ + public DiskBackedPartitionStore( + ImmutableClassesGiraphConfiguration<I, V, E> conf, + Mapper<?, ?, ?, ?>.Context context, + CentralizedServiceWorker<I, V, E> serviceWorker) { + this.conf = conf; + this.context = context; + this.serviceWorker = serviceWorker; + this.minBuffSize = MINIMUM_BUFFER_SIZE_TO_FLUSH.get(conf); + int userMaxNumPartitions = MAX_PARTITIONS_IN_MEMORY.get(conf); + if (userMaxNumPartitions > 0) { + this.isNumPartitionsFixed = true; + this.maxPartitionsInMem.set(userMaxNumPartitions); + oocEngine = null; + } else { + this.isNumPartitionsFixed = false; + this.oocEngine = + new AdaptiveOutOfCoreEngine<I, V, E>(conf, serviceWorker); + } + EdgeStoreFactory<I, V, E> edgeStoreFactory = conf.createEdgeStoreFactory(); + edgeStoreFactory.initialize(serviceWorker, conf, context); + this.edgeStore = edgeStoreFactory.newStore(); + this.movingEdges = false; + this.isInitialized = new AtomicBoolean(false); + + this.processedPartitions = Maps.newHashMap(); + this.processedPartitions + .put(State.INACTIVE, Sets.<Integer>newLinkedHashSet()); + this.processedPartitions + .put(State.IN_TRANSIT, Sets.<Integer>newLinkedHashSet()); + this.processedPartitions + .put(State.ON_DISK, Sets.<Integer>newLinkedHashSet()); + + this.unProcessedPartitions = Maps.newHashMap(); + this.unProcessedPartitions + .put(State.INACTIVE, Sets.<Integer>newLinkedHashSet()); + this.unProcessedPartitions + .put(State.IN_TRANSIT, Sets.<Integer>newLinkedHashSet()); + this.unProcessedPartitions + .put(State.ON_DISK, Sets.<Integer>newLinkedHashSet()); + + // Take advantage of multiple disks + String[] userPaths = PARTITIONS_DIRECTORY.getArray(conf); + basePaths = new String[userPaths.length]; + int i = 0; + for (String path : userPaths) { + basePaths[i++] = path + "/" + conf.get("mapred.job.id", "Unknown Job"); + } + if (LOG.isInfoEnabled()) { + LOG.info("DiskBackedPartitionStore with isStaticGraph=" + + conf.isStaticGraph() + ((userMaxNumPartitions > 0) ? + (" with maximum " + userMaxNumPartitions + " partitions in memory.") : + ".")); + } + } + + /** + * @return maximum number of partitions allowed in memory + */ + public int getNumPartitionSlots() { + return maxPartitionsInMem.get(); + } + + /** + * @return number of partitions in memory + */ + public int getNumPartitionInMemory() { + return numPartitionsInMem.get(); + } + + /** + * Sets the maximum number of partitions allowed in memory + * + * @param numPartitions Number of partitions to allow in memory + */ + public void setNumPartitionSlots(int numPartitions) { + maxPartitionsInMem.set(numPartitions); + } + + @Override + public void initialize() { + // "initialize" is called right before partition assignment in setup + // process. However, it might be the case that this worker is a bit slow + // and other workers start sending vertices/edges (in input superstep) + // to this worker before the initialize is called. So, we put a guard in + // necessary places to make sure the 'initialize' is called at a proper time + // and also only once. + if (isInitialized.compareAndSet(false, true)) { + // Set the maximum number of partition slots in memory if unset + if (maxPartitionsInMem.get() == -1) { + maxPartitionsInMem.set(serviceWorker.getNumPartitionsOwned()); + // Check if master has not done partition assignment yet (may happen in + // test codes) + if (maxPartitionsInMem.get() == 0) { + LOG.warn("initialize: partitions assigned to this worker is not " + + "known yet"); + maxPartitionsInMem.set(partitions.size()); + if (maxPartitionsInMem.get() == 0) { + maxPartitionsInMem.set(Integer.MAX_VALUE); + } + } + if (LOG.isInfoEnabled()) { + LOG.info("initialize: set the max number of partitions in memory " + + "to " + maxPartitionsInMem.get()); + } + oocEngine.initialize(); + } + } + } + + @Override + public Iterable<Integer> getPartitionIds() { + return Iterables.unmodifiableIterable(partitions.keySet()); + } + + @Override + public boolean hasPartition(final Integer id) { + return partitions.containsKey(id); + } + + @Override + public int getNumPartitions() { + return partitions.size(); + } + + @Override + public long getPartitionVertexCount(Integer partitionId) { + MetaPartition meta = partitions.get(partitionId); + if (meta == null) { + return 0; + } else if (meta.getState() == State.ON_DISK) { + return meta.getVertexCount(); + } else { + return meta.getPartition().getVertexCount(); + } + } + + @Override + public long getPartitionEdgeCount(Integer partitionId) { + MetaPartition meta = partitions.get(partitionId); + if (meta == null) { + return 0; + } else if (meta.getState() == State.ON_DISK) { + return meta.getEdgeCount(); + } else { + return meta.getPartition().getEdgeCount(); + } + } + + /** + * Spill one partition to disk. + */ + @edu.umd.cs.findbugs.annotations.SuppressWarnings("TLW_TWO_LOCK_WAIT") + private void swapOnePartitionToDisk() { + Integer partitionId; + // The only partitions in memory are IN_TRANSIT, ACTIVE, and INACTIVE ones. + // If a partition is currently in transit, it means an OOC thread is + // pushing the partition to disk, or a compute thread is swapping the + // partition to open up space for another partition. So, such partitions + // eventually will free up space in memory. However, this method is usually + // called at critical points where freeing up space in memory is crucial. + // So, we should look into a partition to swap amongst other in-memory + // partitions. An in-memory partition that is not in-transit can be in + // three states: + // 1) already processed, which we can simply swap it to disk, + // 2) non-processed but active (means someone is in the middle of + // processing the partition). In this case we cannot touch the + // partition until its processing is done. + // 3) un-processed and inactive. It is bad to swap this partition to disk + // as someone will load it again for processing in future. But, this + // method is called to strictly swap a partition to disk. So, if there + // is no partition in state 1, we should swap a partition in state 3 to + // disk. + rwLock.readLock().lock(); + synchronized (processedPartitions) { + partitionId = popFromSet(processedPartitions.get(State.INACTIVE)); + } + if (partitionId == null) { + synchronized (unProcessedPartitions) { + partitionId = popFromSet(unProcessedPartitions.get(State.INACTIVE)); + } + if (partitionId == null) { + // At this point some partitions are being processed and we should + // wait until their processing is done + synchronized (processedPartitions) { + partitionId = popFromSet(processedPartitions.get(State.INACTIVE)); + while (partitionId == null) { + try { + // Here is the only place we wait on 'processedPartition', and + // this wait is only for INACTIVE entry of the map. So, only at + // times where a partition is added to INACTIVE entry of this map, + // we should call '.notifyAll()'. Although this might seem a bad + // practice, decoupling the INACTIVE entry from this map makes the + // synchronization mechanism cumbersome and error-prone. + processedPartitions.wait(); + } catch (InterruptedException e) { + throw new IllegalStateException("swapOnePartitionToDisk: Caught" + + "InterruptedException while waiting on a partition to" + + "become inactive in memory and swapping it to disk"); + } + partitionId = popFromSet(processedPartitions.get(State.INACTIVE)); + } + } + } + } + + if (LOG.isInfoEnabled()) { + LOG.info("swapOnePartitionToDisk: decided to swap partition " + + partitionId + " to disk"); + } + MetaPartition swapOutPartition = partitions.get(partitionId); + if (swapOutPartition == null) { + throw new IllegalStateException("swapOnePartitionToDisk: the partition " + + "is not found to spill to disk (impossible)"); + } + + // Since the partition is popped from the maps, it is not going to be + // processed (or change its process state) until spilling of the partition + // is done (the only way to access a partition is through + // processedPartitions or unProcessedPartitions Map, so once a partition is + // popped from a map, there is no need for synchronization on that + // partition). + Map<State, Set<Integer>> ownerMap = (swapOutPartition.isProcessed()) ? + processedPartitions : + unProcessedPartitions; + + // Here is the *only* place that holds a lock on an in-transit partition. + // Anywhere else in the code should call wait() on the in-transit partition + // to release the lock. This is an important optimization as we are no + // longer have to keep the lock while partition is being transferred to + // disk. + synchronized (swapOutPartition) { + swapOutPartition.setState(State.IN_TRANSIT); + synchronized (ownerMap) { + ownerMap.get(State.IN_TRANSIT).add(partitionId); + } + } + + try { + if (LOG.isInfoEnabled()) { + LOG.info("swapOnePartitionToDisk: start swapping partition " + + partitionId + " to disk."); + } + offloadPartition(swapOutPartition); + if (LOG.isInfoEnabled()) { + LOG.info("swapOnePartitionToDisk: done swapping partition " + + partitionId + " to disk."); + } + } catch (IOException e) { + throw new IllegalStateException( + "swapOnePartitionToDisk: Failed while offloading partition " + + partitionId); + } + + synchronized (swapOutPartition) { + synchronized (ownerMap) { + boolean stillInMap = ownerMap.get(State.IN_TRANSIT).remove(partitionId); + swapOutPartition.setOnDisk(); + numPartitionsInMem.getAndDecrement(); + // If a compute thread gets an IN_TRANSIT partition (as the last resort + // to get the next partition to process), 'swapOutPartition' may no + // longer be in its map. But, if it is in its own map, we should update + // the map. + if (stillInMap) { + ownerMap.get(State.ON_DISK).add(partitionId); + } + } + // notifying all threads that are waiting for this partition to spill to + // disk. + swapOutPartition.notifyAll(); + } + rwLock.readLock().unlock(); + } + + /** + * Decrement maximum number of partitions allowed in memory by one and pushes + * one partition to disk if necessary. + */ + public void spillOnePartition() { + if (maxPartitionsInMem.getAndDecrement() <= numPartitionsInMem.get()) { + swapOnePartitionToDisk(); + } + } + + @Override + public Partition<I, V, E> getNextPartition() { + Integer partitionId; + // We prioritize accesses to currently in-memory partitions first. If there + // is no such partition, we choose amongst on-disk partitions. This is a + // preferable choice over in-transit partitions since we can start bringing + // on-disk partitions to memory right away, while if we choose in-transit + // partitions, we first have to wait for the transit to be complete, and + // then bring the partition back in memory again. + synchronized (unProcessedPartitions) { + partitionId = popFromSet(unProcessedPartitions.get(State.INACTIVE)); + if (partitionId == null) { + partitionId = popFromSet(unProcessedPartitions.get(State.ON_DISK)); + if (partitionId == null) { + partitionId = popFromSet(unProcessedPartitions.get(State.IN_TRANSIT)); + } + } + } + + // Check if we are at the end of the current iteration cycle + if (partitionId == null) { + return null; + } + + MetaPartition meta = partitions.get(partitionId); + if (meta == null) { + throw new IllegalStateException("getNextPartition: partition " + + partitionId + " does not exist (impossible)"); + } + + // The only time we iterate through all partitions in INPUT_SUPERSTEP is + // when we want to move + // edges from edge store to vertices. So, we check if we have anything in + // edge store for the chosen partition, and if there is no edge store for + // this partition, we skip processing it. This avoids unnecessary loading + // of on-disk partitions that does not have edge store. + if (movingEdges) { + boolean shouldProcess = false; + synchronized (meta) { + if (meta.getState() == State.INACTIVE) { + shouldProcess = edgeStore.hasPartitionEdges(partitionId); + } else { // either ON_DISK or IN_TRANSIT + Integer numBuf = numPendingInputEdgesOnDisk.get(partitionId); + Boolean hasStore = hasEdgeStoreOnDisk.get(partitionId); + shouldProcess = + (numBuf != null && numBuf != 0) || (hasStore != null && hasStore); + } + if (!shouldProcess) { + meta.setProcessed(true); + synchronized (processedPartitions) { + processedPartitions.get(meta.getState()).add(partitionId); + if (meta.getState() == State.INACTIVE) { + processedPartitions.notifyAll(); + } + } + } + } + if (!shouldProcess) { + return getNextPartition(); + } + } + getPartition(meta); + return meta.getPartition(); + } + + /** + * Method that gets a partition from the store. + * The partition is produced as a side effect of the computation and is + * reflected inside the META object provided as parameter. + * This function is thread-safe since it locks the whole computation + * on the MetaPartition provided. + * + * When a thread tries to access an element on disk, it waits until space + * becomes available in memory by swapping partitions to disk. + * + * @param meta meta partition container with the partition itself + */ + private void getPartition(MetaPartition meta) { + int partitionId = meta.getId(); + synchronized (meta) { + boolean partitionInMemory = false; + while (!partitionInMemory) { + switch (meta.getState()) { + case INACTIVE: + meta.setState(State.ACTIVE); + partitionInMemory = true; + break; + case IN_TRANSIT: + try { + // Wait until the partition transfer to disk is complete + meta.wait(); + } catch (InterruptedException e) { + throw new IllegalStateException("getPartition: exception " + + "while waiting on IN_TRANSIT partition " + partitionId + " to" + + " fully spill to disk."); + } + break; + case ON_DISK: + boolean spaceAvailable = false; + + while (numPartitionsInMem.get() >= maxPartitionsInMem.get()) { + swapOnePartitionToDisk(); + } + + // Reserve the space in memory for the partition + if (numPartitionsInMem.incrementAndGet() <= + maxPartitionsInMem.get()) { + spaceAvailable = true; + } else { + numPartitionsInMem.decrementAndGet(); + } + + if (spaceAvailable) { + Partition<I, V, E> partition; + try { + if (LOG.isInfoEnabled()) { + LOG.info("getPartition: start reading partition " + + partitionId + " from disk"); + } + partition = loadPartition(partitionId, meta.getVertexCount()); + if (LOG.isInfoEnabled()) { + LOG.info("getPartition: done reading partition " + + partitionId + " from disk"); + } + } catch (IOException e) { + LOG.error("getPartition: Failed while Loading Partition " + + "from disk: " + e.getMessage()); + throw new IllegalStateException(e); + } + meta.setActive(partition); + partitionInMemory = true; + } + break; + default: + throw new IllegalStateException("illegal state " + meta.getState() + + " for partition " + meta.getId()); + } + } + } + } + + /** + * Spills edge store generated for specified partition in INPUT_SUPERSTEP + * Note that the partition should be ON_DISK or IN_TRANSIT. + * + * @param partitionId Id of partition to spill its edge buffer + */ + public void spillPartitionInputEdgeStore(Integer partitionId) + throws IOException { + rwLock.readLock().lock(); + if (movingEdges) { + rwLock.readLock().unlock(); + return; + } + Pair<Integer, List<VertexIdEdges<I, E>>> entry; + + // Look at the comment for the similar logic in + // 'spillPartitionInputVertexBuffer' for why this lock is necessary. + edgeBufferRWLock.writeLock().lock(); + entry = pendingInputEdges.remove(partitionId); + edgeBufferRWLock.writeLock().unlock(); + + // Check if the intermediate edge store has already been moved to vertices + if (entry == null) { + rwLock.readLock().unlock(); + return; + } + + // Sanity check + if (entry.getRight().isEmpty()) { + throw new IllegalStateException("spillPartitionInputEdgeStore: " + + "the edge buffer that is supposed to be flushed to disk does not" + + "exist."); + } + + List<VertexIdEdges<I, E>> bufferList = entry.getRight(); + Integer numBuffers = numPendingInputEdgesOnDisk.putIfAbsent(partitionId, + bufferList.size()); + if (numBuffers != null) { + numPendingInputEdgesOnDisk.replace( + partitionId, numBuffers + bufferList.size()); + } + + File file = new File(getPendingEdgesBufferPath(partitionId)); + FileOutputStream fos = new FileOutputStream(file, true); + BufferedOutputStream bos = new BufferedOutputStream(fos); + DataOutputStream dos = new DataOutputStream(bos); + for (VertexIdEdges<I, E> edges : entry.getRight()) { + edges.write(dos); + } + dos.close(); + rwLock.readLock().unlock(); + } + + /** + * Looks through all partitions already on disk, and see if any of them has + * enough pending edges in its buffer in memory. If so, put that + * partition along with an approximate amount of memory it took (in bytes) in + * a list to return. + + * @return List of pairs (partitionId, sizeInByte) of the partitions where + * their pending edge store in input superstep in worth flushing to + * disk + */ + public PairList<Integer, Integer> getOocPartitionIdsWithPendingInputEdges() { + PairList<Integer, Integer> pairList = new PairList<>(); + pairList.initialize(); + if (!movingEdges) { + for (Entry<Integer, Pair<Integer, List<VertexIdEdges<I, E>>>> entry : + pendingInputEdges.entrySet()) { + if (entry.getValue().getLeft() > minBuffSize) { + pairList.add(entry.getKey(), entry.getValue().getLeft()); + } + } + } + return pairList; + } + + @Override + @edu.umd.cs.findbugs.annotations.SuppressWarnings("SF_SWITCH_FALLTHROUGH") + public void addPartitionEdges(Integer partitionId, + VertexIdEdges<I, E> edges) { + if (!isInitialized.get()) { + initialize(); + } + + MetaPartition meta = new MetaPartition(partitionId); + MetaPartition temp = partitions.putIfAbsent(partitionId, meta); + if (temp != null) { + meta = temp; + } + + boolean createPartition = false; + synchronized (meta) { + switch (meta.getState()) { + case INIT: + Partition<I, V, E> partition = + conf.createPartition(partitionId, context); + meta.setPartition(partition); + // This is set to processed so that in the very next iteration cycle, + // when startIteration is called, all partitions seem to be processed + // and ready for the next iteration cycle. Otherwise, startIteration + // fails in its sanity check due to finding an unprocessed partition. + meta.setProcessed(true); + numPartitionsInMem.getAndIncrement(); + meta.setState(State.INACTIVE); + synchronized (processedPartitions) { + processedPartitions.get(State.INACTIVE).add(partitionId); + processedPartitions.notifyAll(); + } + createPartition = true; + // Continue to INACTIVE case to add the edges to the partition + // CHECKSTYLE: stop FallThrough + case INACTIVE: + // CHECKSTYLE: resume FallThrough + edgeStore.addPartitionEdges(partitionId, edges); + break; + case IN_TRANSIT: + case ON_DISK: + // Adding edges to in-memory buffer of the partition + List<VertexIdEdges<I, E>> newEdges = + new ArrayList<VertexIdEdges<I, E>>(); + newEdges.add(edges); + int length = edges.getSerializedSize(); + Pair<Integer, List<VertexIdEdges<I, E>>> newPair = + new MutablePair<>(length, newEdges); + edgeBufferRWLock.readLock().lock(); + Pair<Integer, List<VertexIdEdges<I, E>>> oldPair = + pendingInputEdges.putIfAbsent(partitionId, newPair); + if (oldPair != null) { + synchronized (oldPair) { + MutablePair<Integer, List<VertexIdEdges<I, E>>> pair = + (MutablePair<Integer, List<VertexIdEdges<I, E>>>) oldPair; + pair.setLeft(pair.getLeft() + length); + pair.getRight().add(edges); + } + } + edgeBufferRWLock.readLock().unlock(); + // In the case that the number of partitions is asked to be fixed by the + // user, we should offload the edge store as necessary. + if (isNumPartitionsFixed && + pendingInputEdges.get(partitionId).getLeft() > minBuffSize) { + try { + spillPartitionInputEdgeStore(partitionId); + } catch (IOException e) { + throw new IllegalStateException("addPartitionEdges: spilling " + + "edge store for partition " + partitionId + " failed!"); + } + } + break; + default: + throw new IllegalStateException("illegal state " + meta.getState() + + " for partition " + meta.getId()); + } + } + // If creation of a new partition is violating the policy of maximum number + // of partitions in memory, we should spill a partition to disk. + if (createPartition && + numPartitionsInMem.get() > maxPartitionsInMem.get()) { + swapOnePartitionToDisk(); + } + } + + /** + * Spills vertex buffer generated for specified partition in INPUT_SUPERSTEP + * Note that the partition should be ON_DISK or IN_TRANSIT. + * + * @param partitionId Id of partition to spill its vertex buffer + */ + public void spillPartitionInputVertexBuffer(Integer partitionId) + throws IOException { + rwLock.readLock().lock(); + if (movingEdges) { + rwLock.readLock().unlock(); + return; + } + Pair<Integer, List<ExtendedDataOutput>> entry; + // Synchronization on the concurrent map is necessary to avoid inconsistent + // structure while execution of this method is interleaved with the + // execution of addPartitionVertices. For instance, consider + // the case where a thread wants to modify the value of an entry in + // addPartitionVertices while another thread is running this + // method removing the entry from the map. If removal and offloading the + // entry's value to disk happens first, the modification by former thread + // would be lost. + vertexBufferRWLock.writeLock().lock(); + entry = pendingInputVertices.remove(partitionId); + vertexBufferRWLock.writeLock().unlock(); + + // Check if vertex buffer has already been merged with the partition + if (entry == null) { + rwLock.readLock().unlock(); + return; + } + // Sanity check + if (entry.getRight().isEmpty()) { + throw new IllegalStateException("spillPartitionInputVertexBuffer: " + + "the vertex buffer that is supposed to be flushed to disk does not" + + "exist."); + } + + List<ExtendedDataOutput> bufferList = entry.getRight(); + Integer numBuffers = numPendingInputVerticesOnDisk.putIfAbsent(partitionId, + bufferList.size()); + if (numBuffers != null) { + numPendingInputVerticesOnDisk.replace(partitionId, + numBuffers + bufferList.size()); + } + + File file = new File(getPendingVerticesBufferPath(partitionId)); + FileOutputStream fos = new FileOutputStream(file, true); + BufferedOutputStream bos = new BufferedOutputStream(fos); + DataOutputStream dos = new DataOutputStream(bos); + for (ExtendedDataOutput extendedDataOutput : bufferList) { + WritableUtils.writeExtendedDataOutput(extendedDataOutput, dos); + } + dos.close(); + rwLock.readLock().unlock(); + } + + /** + * Looks through all partitions already on disk, and see if any of them has + * enough pending vertices in its buffer in memory. If so, put that + * partition along with an approximate amount of memory it took (in bytes) in + * a list to return. + * + * @return List of pairs (partitionId, sizeInByte) of the partitions where + * their pending vertex buffer in input superstep is worth flushing to + * disk + */ + public PairList<Integer, Integer> + getOocPartitionIdsWithPendingInputVertices() { + PairList<Integer, Integer> pairList = new PairList<>(); + pairList.initialize(); + if (!movingEdges) { + for (Entry<Integer, Pair<Integer, List<ExtendedDataOutput>>> entry : + pendingInputVertices.entrySet()) { + if (entry.getValue().getLeft() > minBuffSize) { + pairList.add(entry.getKey(), entry.getValue().getLeft()); + } + } + } + return pairList; + } + + @Override + @edu.umd.cs.findbugs.annotations.SuppressWarnings("SF_SWITCH_FALLTHROUGH") + public void addPartitionVertices(Integer partitionId, + ExtendedDataOutput extendedDataOutput) { + if (!isInitialized.get()) { + initialize(); + } + + MetaPartition meta = new MetaPartition(partitionId); + MetaPartition temp = partitions.putIfAbsent(partitionId, meta); + if (temp != null) { + meta = temp; + } + + boolean createPartition = false; + synchronized (meta) { + switch (meta.getState()) { + case INIT: + Partition<I, V, E> partition = + conf.createPartition(partitionId, context); + meta.setPartition(partition); + // Look at the comments in 'addPartitionVertices' for why we set the + // this to true. + meta.setProcessed(true); + numPartitionsInMem.getAndIncrement(); + meta.setState(State.INACTIVE); + synchronized (processedPartitions) { + processedPartitions.get(State.INACTIVE).add(partitionId); + processedPartitions.notifyAll(); + } + createPartition = true; + // Continue to INACTIVE case to add the vertices to the partition + // CHECKSTYLE: stop FallThrough + case INACTIVE: + // CHECKSTYLE: resume FallThrough + meta.getPartition().addPartitionVertices( + new VertexIterator<I, V, E>(extendedDataOutput, conf)); + break; + case IN_TRANSIT: + case ON_DISK: + // Adding vertices to in-memory buffer of the partition + List<ExtendedDataOutput> vertices = new ArrayList<ExtendedDataOutput>(); + vertices.add(extendedDataOutput); + int length = extendedDataOutput.getPos(); + Pair<Integer, List<ExtendedDataOutput>> newPair = + new MutablePair<>(length, vertices); + vertexBufferRWLock.readLock().lock(); + Pair<Integer, List<ExtendedDataOutput>> oldPair = + pendingInputVertices.putIfAbsent(partitionId, newPair); + if (oldPair != null) { + synchronized (oldPair) { + MutablePair<Integer, List<ExtendedDataOutput>> pair = + (MutablePair<Integer, List<ExtendedDataOutput>>) oldPair; + pair.setLeft(pair.getLeft() + length); + pair.getRight().add(extendedDataOutput); + } + } + vertexBufferRWLock.readLock().unlock(); + // In the case that the number of partitions is asked to be fixed by the + // user, we should offload the edge store as necessary. + if (isNumPartitionsFixed && + pendingInputVertices.get(partitionId).getLeft() > minBuffSize) { + try { + spillPartitionInputVertexBuffer(partitionId); + } catch (IOException e) { + throw new IllegalStateException("addPartitionVertices: spilling " + + "vertex buffer for partition " + partitionId + " failed!"); + } + } + break; + default: + throw new IllegalStateException("illegal state " + meta.getState() + + " for partition " + meta.getId()); + } + } + // If creation of a new partition is violating the policy of maximum number + // of partitions in memory, we should spill a partition to disk. + if (createPartition && + numPartitionsInMem.get() > maxPartitionsInMem.get()) { + swapOnePartitionToDisk(); + } + } + + @Override + public void putPartition(Partition<I, V, E> partition) { + if (partition == null) { + throw new IllegalStateException("putPartition: partition to put is null" + + " (impossible)"); + } + Integer id = partition.getId(); + MetaPartition meta = partitions.get(id); + if (meta == null) { + throw new IllegalStateException("putPartition: partition to put does" + + "not exist in the store (impossible)"); + } + synchronized (meta) { + if (meta.getState() != State.ACTIVE) { + String msg = "It is not possible to put back a partition which is " + + "not ACTIVE.\n" + meta.toString(); + LOG.error(msg); + throw new IllegalStateException(msg); + } + + meta.setState(State.INACTIVE); + meta.setProcessed(true); + synchronized (processedPartitions) { + processedPartitions.get(State.INACTIVE).add(id); + // Notify OOC threads waiting for a partition to become available to put + // on disk. + processedPartitions.notifyAll(); + } + } + } + + @Override + public Partition<I, V, E> removePartition(Integer partitionId) { + if (hasPartition(partitionId)) { + MetaPartition meta = partitions.remove(partitionId); + // Since this method is called outside of the iteration cycle, all + // partitions in the store should be in the processed state. + if (!processedPartitions.get(meta.getState()).remove(partitionId)) { + throw new IllegalStateException("removePartition: partition that is" + + "about to remove is not in processed list (impossible)"); + } + getPartition(meta); + numPartitionsInMem.getAndDecrement(); + return meta.getPartition(); + } + return null; + } + + @Override + public boolean addPartition(Partition<I, V, E> partition) { + if (!isInitialized.get()) { + initialize(); + } + + Integer id = partition.getId(); + MetaPartition meta = new MetaPartition(id); + MetaPartition temp = partitions.putIfAbsent(id, meta); + if (temp != null) { + return false; + } + + if (LOG.isInfoEnabled()) { + LOG.info("addPartition: partition " + id + "is added to the store."); + } + + meta.setPartition(partition); + meta.setState(State.INACTIVE); + meta.setProcessed(true); + synchronized (processedPartitions) { + processedPartitions.get(State.INACTIVE).add(id); + processedPartitions.notifyAll(); + } + numPartitionsInMem.getAndIncrement(); + // Swapping partitions to disk if addition of this partition violates the + // requirement on the number of partitions. + if (numPartitionsInMem.get() > maxPartitionsInMem.get()) { + swapOnePartitionToDisk(); + } + return true; + } + + @Override + public void shutdown() { + // Sanity check to check there is nothing left from previous superstep + if (!unProcessedPartitions.get(State.INACTIVE).isEmpty() || + !unProcessedPartitions.get(State.IN_TRANSIT).isEmpty() || + !unProcessedPartitions.get(State.ON_DISK).isEmpty()) { + throw new IllegalStateException("shutdown: There are some " + + "unprocessed partitions left from the " + + "previous superstep. This should not be possible"); + } + + for (MetaPartition meta : partitions.values()) { + synchronized (meta) { + while (meta.getState() == State.IN_TRANSIT) { + try { + meta.wait(); + } catch (InterruptedException e) { + throw new IllegalStateException("shutdown: exception while" + + "waiting on an IN_TRANSIT partition to be written on disk"); + } + } + if (meta.getState() == State.ON_DISK) { + deletePartitionFiles(meta.getId()); + } + } + } + + if (oocEngine != null) { + oocEngine.shutdown(); + } + } + + @Override + public void startIteration() { + if (!isInitialized.get()) { + initialize(); + } + if (LOG.isInfoEnabled()) { + LOG.info("startIteration: with " + numPartitionsInMem.get() + + " partitions in memory, there can be maximum " + maxPartitionsInMem + + " partitions in memory out of " + partitions.size() + " that " + + "belongs to this worker."); + } + // Sanity check to make sure nothing left unprocessed from previous + // superstep + if (!unProcessedPartitions.get(State.INACTIVE).isEmpty() || + !unProcessedPartitions.get(State.IN_TRANSIT).isEmpty() || + !unProcessedPartitions.get(State.ON_DISK).isEmpty()) { + throw new IllegalStateException("startIteration: There are some " + + "unprocessed and/or in-transition partitions left from the " + + "previous superstep. This should not be possible"); + } + + rwLock.writeLock().lock(); + for (MetaPartition meta : partitions.values()) { + // Sanity check + if (!meta.isProcessed()) { + throw new IllegalStateException("startIteration: meta-partition " + + meta + " has not been processed in the previous superstep."); + } + // The only case where a partition can be IN_TRANSIT is where it is still + // being offloaded to disk, and it happens only in swapOnePartitionToDisk, + // where we at least hold a read lock while transfer is in progress. Since + // the write lock is held in this section, no partition should be + // IN_TRANSIT. + if (meta.getState() == State.IN_TRANSIT) { + throw new IllegalStateException("startIteration: meta-partition " + + meta + " is still IN_TRANSIT (impossible)"); + } + + meta.setProcessed(false); + } + + unProcessedPartitions.clear(); + unProcessedPartitions.putAll(processedPartitions); + processedPartitions.clear(); + processedPartitions + .put(State.INACTIVE, Sets.<Integer>newLinkedHashSet()); + processedPartitions + .put(State.IN_TRANSIT, Sets.<Integer>newLinkedHashSet()); + processedPartitions + .put(State.ON_DISK, Sets.<Integer>newLinkedHashSet()); + rwLock.writeLock().unlock(); + LOG.info("startIteration: done preparing the iteration"); + } + + @Override + public void moveEdgesToVertices() { + movingEdges = true; + edgeStore.moveEdgesToVertices(); + movingEdges = false; + } + + /** + * Pops an entry from the specified set. This is guaranteed that the set is + * being accessed from within a lock. + * + * @param set set to pop an entry from + * @return popped entry from the given set + */ + private Integer popFromSet(Set<Integer> set) { + if (!set.isEmpty()) { + Iterator<Integer> it = set.iterator(); + Integer id = it.next(); + it.remove(); + return id; + } + return null; + } + + /** + * Allows more partitions to be stored in memory. + * + * @param numPartitionsToIncrease How many more partitions to allow in the + * store + */ + public void increasePartitionSlots(Integer numPartitionsToIncrease) { + maxPartitionsInMem.getAndAdd(numPartitionsToIncrease); + if (LOG.isInfoEnabled()) { + LOG.info("increasePartitionSlots: allowing partition store to have " + + numPartitionsToIncrease + " more partitions. Now, partition store " + + "can have up to " + maxPartitionsInMem.get() + " partitions."); + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + for (MetaPartition e : partitions.values()) { + sb.append(e.toString() + "\n"); + } + return sb.toString(); + } + + /** + * Writes vertex data (Id, value and halted state) to stream. + * + * @param output The output stream + * @param vertex The vertex to serialize + * @throws IOException + */ + private void writeVertexData(DataOutput output, Vertex<I, V, E> vertex) + throws IOException { + + vertex.getId().write(output); + vertex.getValue().write(output); + output.writeBoolean(vertex.isHalted()); + } + + /** + * Writes vertex edges (Id, edges) to stream. + * + * @param output The output stream + * @param vertex The vertex to serialize + * @throws IOException + */ + @SuppressWarnings("unchecked") + private void writeOutEdges(DataOutput output, Vertex<I, V, E> vertex) + throws IOException { + + vertex.getId().write(output); + OutEdges<I, E> edges = (OutEdges<I, E>) vertex.getEdges(); + edges.write(output); + } + + /** + * Read vertex data from an input and initialize the vertex. + * + * @param in The input stream + * @param vertex The vertex to initialize + * @throws IOException + */ + private void readVertexData(DataInput in, Vertex<I, V, E> vertex) + throws IOException { + + I id = conf.createVertexId(); + id.readFields(in); + V value = conf.createVertexValue(); + value.readFields(in); + OutEdges<I, E> edges = conf.createAndInitializeOutEdges(0); + vertex.initialize(id, value, edges); + if (in.readBoolean()) { + vertex.voteToHalt(); + } else { + vertex.wakeUp(); + } + } + + /** + * Read vertex edges from an input and set them to the vertex. + * + * @param in The input stream + * @param partition The partition owning the vertex + * @throws IOException + */ + @SuppressWarnings("unchecked") + private void readOutEdges(DataInput in, Partition<I, V, E> partition) + throws IOException { + + I id = conf.createVertexId(); + id.readFields(in); + Vertex<I, V, E> v = partition.getVertex(id); + OutEdges<I, E> edges = (OutEdges<I, E>) v.getEdges(); + edges.readFields(in); + partition.saveVertex(v); + } + + /** + * Load a partition from disk. It deletes the files after the load, + * except for the edges, if the graph is static. + * + * @param id The id of the partition to load + * @param numVertices The number of vertices contained on disk + * @return The partition + * @throws IOException + */ + @SuppressWarnings("unchecked") + private Partition<I, V, E> loadPartition(int id, long numVertices) + throws IOException { + Partition<I, V, E> partition = conf.createPartition(id, context); + + // Vertices + File file = new File(getVerticesPath(id)); + if (LOG.isDebugEnabled()) { + LOG.debug("loadPartition: loading partition vertices " + + partition.getId() + " from " + file.getAbsolutePath()); + } + + FileInputStream filein = new FileInputStream(file); + BufferedInputStream bufferin = new BufferedInputStream(filein); + DataInputStream inputStream = new DataInputStream(bufferin); + for (int i = 0; i < numVertices; ++i) { + Vertex<I, V , E> vertex = conf.createVertex(); + readVertexData(inputStream, vertex); + partition.putVertex(vertex); + } + inputStream.close(); + if (!file.delete()) { + String msg = "loadPartition: failed to delete " + file.getAbsolutePath(); + LOG.error(msg); + throw new IllegalStateException(msg); + } + + // Edges + file = new File(getEdgesPath(id)); + + if (LOG.isDebugEnabled()) { + LOG.debug("loadPartition: loading partition edges " + + partition.getId() + " from " + file.getAbsolutePath()); + } + + filein = new FileInputStream(file); + bufferin = new BufferedInputStream(filein); + inputStream = new DataInputStream(bufferin); + for (int i = 0; i < numVertices; ++i) { + readOutEdges(inputStream, partition); + } + inputStream.close(); + // If the graph is static and it is not INPUT_SUPERSTEP, keep the file + // around. + if (!conf.isStaticGraph() || + serviceWorker.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP) { + if (!file.delete()) { + String msg = + "loadPartition: failed to delete " + file.getAbsolutePath(); + LOG.error(msg); + throw new IllegalStateException(msg); + } + } + + if (serviceWorker.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP) { + // Input vertex buffers + // First, applying vertex buffers on disk (since they came earlier) + Integer numBuffers = numPendingInputVerticesOnDisk.remove(id); + if (numBuffers != null) { + file = new File(getPendingVerticesBufferPath(id)); + if (LOG.isDebugEnabled()) { + LOG.debug("loadPartition: loading " + numBuffers + " input vertex " + + "buffers of partition " + id + " from " + file.getAbsolutePath()); + } + filein = new FileInputStream(file); + bufferin = new BufferedInputStream(filein); + inputStream = new DataInputStream(bufferin); + for (int i = 0; i < numBuffers; ++i) { + ExtendedDataOutput extendedDataOutput = + WritableUtils.readExtendedDataOutput(inputStream, conf); + partition.addPartitionVertices( + new VertexIterator<I, V, E>(extendedDataOutput, conf)); + } + inputStream.close(); + if (!file.delete()) { + String msg = + "loadPartition: failed to delete " + file.getAbsolutePath(); + LOG.error(msg); + throw new IllegalStateException(msg); + } + } + // Second, applying vertex buffers already in memory + Pair<Integer, List<ExtendedDataOutput>> vertexPair; + vertexBufferRWLock.writeLock().lock(); + vertexPair = pendingInputVertices.remove(id); + vertexBufferRWLock.writeLock().unlock(); + if (vertexPair != null) { + for (ExtendedDataOutput extendedDataOutput : vertexPair.getRight()) { + partition.addPartitionVertices( + new VertexIterator<I, V, E>(extendedDataOutput, conf)); + } + } + + // Edge store + if (!hasEdgeStoreOnDisk.containsKey(id)) { + throw new IllegalStateException("loadPartition: partition is written" + + " to disk in INPUT_SUPERSTEP, but it is not clear whether its " + + "edge store is on disk or not (impossible)"); + } + if (hasEdgeStoreOnDisk.remove(id)) { + file = new File(getEdgeStorePath(id)); + if (LOG.isDebugEnabled()) { + LOG.debug("loadPartition: loading edge store of partition " + id + + " from " + file.getAbsolutePath()); + } + filein = new FileInputStream(file); + bufferin = new BufferedInputStream(filein); + inputStream = new DataInputStream(bufferin); + edgeStore.readPartitionEdgeStore(id, inputStream); + inputStream.close(); + if (!file.delete()) { + String msg = + "loadPartition: failed to delete " + file.getAbsolutePath(); + LOG.error(msg); + throw new IllegalStateException(msg); + } + } + + // Input edge buffers + // First, applying edge buffers on disk (since they came earlier) + numBuffers = numPendingInputEdgesOnDisk.remove(id); + if (numBuffers != null) { + file = new File(getPendingEdgesBufferPath(id)); + if (LOG.isDebugEnabled()) { + LOG.debug("loadPartition: loading " + numBuffers + " input edge " + + "buffers of partition " + id + " from " + file.getAbsolutePath()); + } + filein = new FileInputStream(file); + bufferin = new BufferedInputStream(filein); + inputStream = new DataInputStream(bufferin); + for (int i = 0; i < numBuffers; ++i) { + VertexIdEdges<I, E> vertexIdEdges = + new ByteArrayVertexIdEdges<I, E>(); + vertexIdEdges.setConf(conf); + vertexIdEdges.readFields(inputStream); + edgeStore.addPartitionEdges(id, vertexIdEdges); + } + inputStream.close(); + if (!file.delete()) { + String msg = + "loadPartition: failed to delete " + file.getAbsolutePath(); + LOG.error(msg); + throw new IllegalStateException(msg); + } + } + // Second, applying edge buffers already in memory + Pair<Integer, List<VertexIdEdges<I, E>>> edgePair = null; + edgeBufferRWLock.writeLock().lock(); + edgePair = pendingInputEdges.remove(id); + edgeBufferRWLock.writeLock().unlock(); + if (edgePair != null) { + for (VertexIdEdges<I, E> vertexIdEdges : edgePair.getRight()) { + edgeStore.addPartitionEdges(id, vertexIdEdges); + } + } + } + return partition; + } + + /** + * Write a partition to disk. + * + * @param meta meta partition containing the partition to offload + * @throws IOException + */ + private void offloadPartition(MetaPartition meta) throws IOException { + Partition<I, V, E> partition = meta.getPartition(); + int partitionId = meta.getId(); + File file = new File(getVerticesPath(partitionId)); + File parent = file.getParentFile(); + if (!parent.exists() && !parent.mkdirs() && LOG.isDebugEnabled()) { + LOG.debug("offloadPartition: directory " + parent.getAbsolutePath() + + " already exists."); + } + + if (!file.createNewFile()) { + String msg = "offloadPartition: file " + parent.getAbsolutePath() + + " already exists."; + LOG.error(msg); + throw new IllegalStateException(msg); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("offloadPartition: writing partition vertices " + + partitionId + " to " + file.getAbsolutePath()); + } + + FileOutputStream fileout = new FileOutputStream(file); + BufferedOutputStream bufferout = new BufferedOutputStream(fileout); + DataOutputStream outputStream = new DataOutputStream(bufferout); + for (Vertex<I, V, E> vertex : partition) { + writeVertexData(outputStream, vertex); + } + outputStream.close(); + + // Avoid writing back edges if we have already written them once and + // the graph is not changing. + // If we are in the input superstep, we need to write the files + // at least the first time, even though the graph is static. + file = new File(getEdgesPath(partitionId)); + if (serviceWorker.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP || + meta.getPrevVertexCount() != partition.getVertexCount() || + !conf.isStaticGraph() || !file.exists()) { + + meta.setPrevVertexCount(partition.getVertexCount()); + + if (!file.createNewFile() && LOG.isDebugEnabled()) { + LOG.debug("offloadPartition: file " + file.getAbsolutePath() + + " already exists."); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("offloadPartition: writing partition edges " + + partitionId + " to " + file.getAbsolutePath()); + } + + fileout = new FileOutputStream(file); + bufferout = new BufferedOutputStream(fileout); + outputStream = new DataOutputStream(bufferout); + for (Vertex<I, V, E> vertex : partition) { + writeOutEdges(outputStream, vertex); + } + outputStream.close(); + } + + // Writing edge store to disk in the input superstep + if (serviceWorker.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP) { + if (edgeStore.hasPartitionEdges(partitionId)) { + hasEdgeStoreOnDisk.put(partitionId, true); + file = new File(getEdgeStorePath(partitionId)); + if (!file.createNewFile() && LOG.isDebugEnabled()) { + LOG.debug("offloadPartition: file " + file.getAbsolutePath() + + " already exists."); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("offloadPartition: writing partition edge store of " + + partitionId + " to " + file.getAbsolutePath()); + } + + fileout = new FileOutputStream(file); + bufferout = new BufferedOutputStream(fileout); + outputStream = new DataOutputStream(bufferout); + edgeStore.writePartitionEdgeStore(partitionId, outputStream); + outputStream.close(); + } else { + hasEdgeStoreOnDisk.put(partitionId, false); + } + } + } + + /** + * Delete a partition's files. + * + * @param id The id of the partition owning the file. + */ + public void deletePartitionFiles(Integer id) { + // File containing vertices + File file = new File(getVerticesPath(id)); + if (file.exists() && !file.delete()) { + String msg = "deletePartitionFiles: Failed to delete file " + + file.getAbsolutePath(); + LOG.error(msg); + throw new IllegalStateException(msg); + } + + // File containing edges + file = new File(getEdgesPath(id)); + if (file.exists() && !file.delete()) { + String msg = "deletePartitionFiles: Failed to delete file " + + file.getAbsolutePath(); + LOG.error(msg); + throw new IllegalStateException(msg); + } + } + + /** + * Get the path and basename of the storage files. + * + * @param partitionId The partition + * @return The path to the given partition + */ + private String getPartitionPath(Integer partitionId) { + int hash = hasher.hashInt(partitionId).asInt(); + int idx = Math.abs(hash % basePaths.length); + return basePaths[idx] + "/partition-" + partitionId; + } + + /** + * Get the path to the file where vertices are stored. + * + * @param partitionId The partition + * @return The path to the vertices file + */ + private String getVerticesPath(Integer partitionId) { + return getPartitionPath(partitionId) + "_vertices"; + } + + /** + * Get the path to the file where pending vertices in INPUT_SUPERSTEP + * are stored. + * + * @param partitionId The partition + * @return The path to the file + */ + private String getPendingVerticesBufferPath(Integer partitionId) { + return getPartitionPath(partitionId) + "_pending_vertices"; + } + + /** + * Get the path to the file where edge store of a partition in INPUT_SUPERSTEP + * is stored. + * + * @param partitionId The partition + * @return The path to the file + */ + private String getEdgeStorePath(Integer partitionId) { + return getPartitionPath(partitionId) + "_edge_store"; + } + + /** + * Get the path to the file where pending edges in INPUT_SUPERSTEP + * are stored. + * + * @param partitionId The partition + * @return The path to the file + */ + private String getPendingEdgesBufferPath(Integer partitionId) { + return getPartitionPath(partitionId) + "_pending_edges"; + } + + /** + * Get the path to the file where edges are stored. + * + * @param partitionId The partition + * @return The path to the edges file + */ + private String getEdgesPath(Integer partitionId) { + return getPartitionPath(partitionId) + "_edges"; + } + + /** + * Partition container holding additional meta data associated with each + * partition. + */ + private class MetaPartition { + // ---- META INFORMATION ---- + /** ID of the partition */ + private int id; + /** State in which the partition is */ + private State state; + /** Number of vertices contained in the partition */ + private long vertexCount; + /** Previous number of vertices contained in the partition */ + private long prevVertexCount; + /** Number of edges contained in the partition */ + private long edgeCount; + /** + * Whether the partition is already processed in the current iteration + * cycle + */ + private boolean isProcessed; + + // ---- PARTITION ---- + /** the actual partition. Depending on the state of the partition, + this object could be empty. */ + private Partition<I, V, E> partition; + + /** + * Initialization of the metadata enriched partition. + * + * @param id id of the partition + */ + public MetaPartition(int id) { + this.id = id; + this.state = State.INIT; + this.vertexCount = 0; + this.prevVertexCount = 0; + this.edgeCount = 0; + this.isProcessed = false; + + this.partition = null; + } + + /** + * @return the id + */ + public int getId() { + return id; + } + + /** + * @return the state + */ + public State getState() { + return state; + } + + /** + * This function sets the metadata for on-disk partition. + */ + public void setOnDisk() { + this.state = State.ON_DISK; + this.vertexCount = partition.getVertexCount(); + this.edgeCount = partition.getEdgeCount(); + this.partition = null; + } + + /** + * + * @param partition the partition associated to this container + */ + public void setActive(Partition<I, V, E> partition) { + if (partition != null) { + this.partition = partition; + } + this.state = State.ACTIVE; + this.prevVertexCount = this.vertexCount; + this.vertexCount = 0; + } + + /** + * @param state the state to set + */ + public void setState(State state) { + this.state = state; + } + + /** + * set previous number of vertexes + * @param vertexCount number of vertexes + */ + public void setPrevVertexCount(long vertexCount) { + this.prevVertexCount = vertexCount; + } + + /** + * @return the vertexCount + */ + public long getPrevVertexCount() { + return prevVertexCount; + } + + /** + * @return the vertexCount + */ + public long getVertexCount() { + return vertexCount; + } + + /** + * @return the edgeCount + */ + public long getEdgeCount() { + return edgeCount; + } + + /** + * @return true iff the partition is marked as processed. + */ + public boolean isProcessed() { + return isProcessed; + } + + /** + * Set the state of this partition in terms of being already processed or + * not + * @param isProcessed whether the partition is processed or not + */ + public void setProcessed(boolean isProcessed) { + this.isProcessed = isProcessed; + } + + /** + * @return the partition + */ + public Partition<I, V, E> getPartition() { + return partition; + } + + /** + * @param partition the partition to set + */ + public void setPartition(Partition<I, V, E> partition) { + this.partition = partition; + } + + @Override + public String toString() { + StringBuffer sb = new StringBuffer(); + + sb.append("Meta Data: { "); + sb.append("ID: " + id + "; "); + sb.append("State: " + state + "; "); + sb.append("Number of Vertices: " + vertexCount + "; "); + sb.append("Previous number of Vertices: " + prevVertexCount + "; "); + sb.append("Number of edges: " + edgeCount + "; "); + sb.append("Is processed: " + isProcessed + "; }"); + sb.append("Partition: " + partition + "; }"); + + return sb.toString(); + } + } +}
http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/ooc/JVMMemoryEstimator.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/JVMMemoryEstimator.java b/giraph-core/src/main/java/org/apache/giraph/ooc/JVMMemoryEstimator.java new file mode 100644 index 0000000..46d989a --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/JVMMemoryEstimator.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc; + +import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.utils.MemoryUtils; + +/** + * Memory estimator class using JVM runtime methods to estimate the + * free/available memory. + */ +public class JVMMemoryEstimator implements MemoryEstimator { + /** + * Constructor for reflection + */ + public JVMMemoryEstimator() { } + + @Override + public void initialize(CentralizedServiceWorker serviceWorker) { } + + @Override + public double freeMemoryMB() { + return MemoryUtils.freePlusUnallocatedMemoryMB(); + } + + @Override public double maxMemoryMB() { + return MemoryUtils.maxMemoryMB(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/ooc/MemoryEstimator.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/MemoryEstimator.java b/giraph-core/src/main/java/org/apache/giraph/ooc/MemoryEstimator.java new file mode 100644 index 0000000..74a2f7b --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/MemoryEstimator.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc; + +import org.apache.giraph.bsp.CentralizedServiceWorker; + +/** + * Interface for memory estimator. Estimated memory is used in adaptive + * out-of-core mechanism. + */ +public interface MemoryEstimator { + /** + * Initialize the memory estimator. + * + * @param serviceWorker Worker service + */ + void initialize(CentralizedServiceWorker serviceWorker); + + /** + * @return amount of free memory in MB + */ + double freeMemoryMB(); + + /** + * @return amount of available memory in MB + */ + double maxMemoryMB(); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java new file mode 100644 index 0000000..218a10b --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Engine for out-of-core mechanism. The engine may attach to a partition store + * capable of handling disk transfer of partitions. + * + * @param <I> Vertex id + * @param <V> Vertex data + * @param <E> Edge data + */ +public interface OutOfCoreEngine<I extends WritableComparable, + V extends Writable, E extends Writable> { + /** + * Initialize/Start the out-of-core engine. + */ + void initialize(); + + /** + * Shutdown/Stop the out-of-core engine. + */ + void shutdown(); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreProcessorCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreProcessorCallable.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreProcessorCallable.java new file mode 100644 index 0000000..d58ebe0 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreProcessorCallable.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc; + +import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Class to implement slaves for adaptive out-of-core brain. Basically the brain + * decides on when to offload and what data to offload to disk and generates + * commands for offloading. Slaves just execute the commands. Commands can be: + * 1) offloading vertex buffer of partitions in INPUT_SUPERSTEP, + * 2) offloading edge buffer of partitions in INPUT_SUPERSTEP, + * 3) offloading partitions. + * + * @param <I> Vertex id + * @param <V> Vertex value + * @param <E> Edge value + */ +public class OutOfCoreProcessorCallable<I extends WritableComparable, + V extends Writable, E extends Writable> implements Callable<Void> { + /** Class logger */ + private static final Logger LOG = + Logger.getLogger(OutOfCoreProcessorCallable.class); + /** Partition store */ + private final DiskBackedPartitionStore<I, V, E> partitionStore; + /** Adaptive out-of-core engine */ + private final AdaptiveOutOfCoreEngine<I, V, E> oocEngine; + + /** + * Constructor for out-of-core processor threads. + * + * @param oocEngine out-of-core engine + * @param serviceWorker worker service + */ + public OutOfCoreProcessorCallable(AdaptiveOutOfCoreEngine<I, V, E> oocEngine, + CentralizedServiceWorker<I, V, E> serviceWorker) { + this.oocEngine = oocEngine; + this.partitionStore = + (DiskBackedPartitionStore<I, V, E>) serviceWorker.getPartitionStore(); + } + + @Override + public Void call() { + while (true) { + // First wait on a gate to be opened by memory-check thread. Memory-check + // thread opens the gate once there are data available to be spilled to + // disk. + try { + oocEngine.waitOnGate(); + } catch (InterruptedException e) { + throw new IllegalStateException("call: Caught InterruptedException " + + "while waiting on memory check thread signal on available " + + "partitions to put on disk"); + } catch (BrokenBarrierException e) { + throw new IllegalStateException("call Caught BrokenBarrierException. " + + "Looks like some other threads broke while waiting on barrier"); + } + + // The value of 'done' is true iff it is set right before the gate + // at end of check-memory thread. In such case, the computation is done + // and OOC processing threads should terminate gracefully. + if (oocEngine.isDone()) { + break; + } + + BlockingQueue<Integer> partitionsWithInputVertices = + oocEngine.getPartitionsWithInputVertices(); + BlockingQueue<Integer> partitionsWithInputEdges = + oocEngine.getPartitionsWithInputEdges(); + AtomicInteger numPartitionsToSpill = + oocEngine.getNumPartitionsToSpill(); + + while (!partitionsWithInputVertices.isEmpty()) { + Integer partitionId = partitionsWithInputVertices.poll(); + if (partitionId == null) { + break; + } + LOG.info("call: spilling vertex buffer of partition " + partitionId); + try { + partitionStore.spillPartitionInputVertexBuffer(partitionId); + } catch (IOException e) { + throw new IllegalStateException("call: caught IOException while " + + "spilling vertex buffers to disk"); + } + } + + while (!partitionsWithInputEdges.isEmpty()) { + Integer partitionId = partitionsWithInputEdges.poll(); + if (partitionId == null) { + break; + } + LOG.info("call: spilling edge buffer of partition " + partitionId); + try { + partitionStore.spillPartitionInputEdgeStore(partitionId); + } catch (IOException e) { + throw new IllegalStateException("call: caught IOException while " + + "spilling edge buffers/store to disk"); + } + } + + // Put partitions on disk + while (numPartitionsToSpill.getAndDecrement() > 0) { + LOG.info("call: start offloading a partition"); + partitionStore.spillOnePartition(); + } + + // Signal memory check thread that I am done putting data on disk + try { + oocEngine.waitOnOocSignal(); + } catch (InterruptedException e) { + throw new IllegalStateException("call: Caught InterruptedException " + + "while waiting to notify memory check thread that I am done"); + } catch (BrokenBarrierException e) { + throw new IllegalStateException("call: Caught BrokenBarrierException " + + "while waiting to notify memory check thread that I am done"); + } + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/03ade425/giraph-core/src/main/java/org/apache/giraph/ooc/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/package-info.java b/giraph-core/src/main/java/org/apache/giraph/ooc/package-info.java new file mode 100644 index 0000000..7509479 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Package of out-of-core related classes. + */ +package org.apache.giraph.ooc;
