http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java new file mode 100644 index 0000000..6c08dfd --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java @@ -0,0 +1,947 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc.data; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.giraph.ooc.OutOfCoreEngine; +import org.apache.giraph.worker.BspServiceWorker; +import org.apache.log4j.Logger; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.google.common.base.Preconditions.checkState; + +/** + * Class to keep meta-information about partition data, edge data, and message + * data of each partition on a worker. + */ +public class MetaPartitionManager { + /** + * Flag representing no partitions is left to process in the current iteration + * cycle over all partitions. + */ + public static final int NO_PARTITION_TO_PROCESS = -1; + + /** Class logger */ + private static final Logger LOG = + Logger.getLogger(MetaPartitionManager.class); + /** Different storage states for data */ + private enum StorageState { IN_MEM, ON_DISK, IN_TRANSIT }; + /** + * Different processing states for partitions. Processing states are reset + * at the beginning of each iteration cycle over partitions. + */ + private enum ProcessingState { PROCESSED, UNPROCESSED, IN_PROCESS }; + + /** Number of in-memory partitions */ + private final AtomicInteger numInMemoryPartitions = new AtomicInteger(0); + /** Map of partitions to their meta information */ + private final ConcurrentMap<Integer, MetaPartition> partitions = + Maps.newConcurrentMap(); + /** List of partitions assigned to each IO threads */ + private final List<PerThreadPartitionStatus> perThreadPartitions; + /** For each IO thread, set of partition ids that are on-disk and have + * 'large enough' vertex/edge buffers to be offloaded on disk + */ + private final List<Set<Integer>> perThreadVertexEdgeBuffers; + /** + * For each IO thread, set of partition ids that are on-disk and have + * 'large enough' message buffers to be offloaded on disk + */ + private final List<Set<Integer>> perThreadMessageBuffers; + /** + * Out-of-core engine + */ + private final OutOfCoreEngine oocEngine; + /** + * Number of processed partitions in the current iteration cycle over all + * partitions + */ + private final AtomicInteger numPartitionsProcessed = new AtomicInteger(0); + /** + * Random number generator to choose a thread to get one of its partition for + * processing + */ + private final Random randomGenerator; + + /** + * Constructor + * + * @param numIOThreads number of IO threads + * @param oocEngine out-of-core engine + */ + public MetaPartitionManager(int numIOThreads, OutOfCoreEngine oocEngine) { + perThreadPartitions = new ArrayList<>(numIOThreads); + perThreadVertexEdgeBuffers = new ArrayList<>(numIOThreads); + perThreadMessageBuffers = new ArrayList<>(numIOThreads); + for (int i = 0; i < numIOThreads; ++i) { + perThreadPartitions.add(new PerThreadPartitionStatus()); + perThreadMessageBuffers.add(Sets.<Integer>newConcurrentHashSet()); + perThreadVertexEdgeBuffers.add(Sets.<Integer>newConcurrentHashSet()); + } + this.oocEngine = oocEngine; + this.randomGenerator = new Random(); + } + + /** + * Get number of partitions in memory + * + * @return number of partitions in memory + */ + public int getNumInMemoryPartitions() { + return numInMemoryPartitions.get(); + } + + /** + * Get total number of partitions + * + * @return total number of partition + */ + public int getNumPartitions() { + return partitions.size(); + } + + /** + * Whether a given partition is available + * + * @param partitionId id of the partition to check if this worker owns it + * @return true if the worker owns the partition, false otherwise + */ + public boolean hasPartition(Integer partitionId) { + return partitions.containsKey(partitionId); + } + + /** + * Return the list of all available partitions as an iterable + * + * @return list of all available partitions + */ + public Iterable<Integer> getPartitionIds() { + return partitions.keySet(); + } + + /** + * Add a partition + * + * @param partitionId id of a partition to add + */ + public void addPartition(int partitionId) { + MetaPartition meta = new MetaPartition(partitionId); + MetaPartition temp = partitions.putIfAbsent(partitionId, meta); + // Check if the given partition is new + if (temp == null) { + int ownerThread = oocEngine.getIOScheduler() + .getOwnerThreadId(partitionId); + Set<MetaPartition> partitionSet = + perThreadPartitions.get(ownerThread).getInMemoryProcessed(); + synchronized (partitionSet) { + partitionSet.add(meta); + } + numInMemoryPartitions.getAndIncrement(); + } + } + + /** + * Remove a partition. This method assumes that the partition is already + * retrieved and is in memory) + * + * @param partitionId id of a partition to remove + */ + public void removePartition(Integer partitionId) { + MetaPartition meta = partitions.remove(partitionId); + checkState(!meta.isOnDisk()); + numInMemoryPartitions.getAndDecrement(); + } + + /** + * Pops an entry from the specified set. + * + * @param set set to pop an entry from + * @param <T> Type of entries in the set + * @return popped entry from the given set + */ + private static <T> T popFromSet(Set<T> set) { + if (!set.isEmpty()) { + Iterator<T> it = set.iterator(); + T entry = it.next(); + it.remove(); + return entry; + } + return null; + } + + /** + * Peeks an entry from the specified set. + * + * @param set set to peek an entry from + * @param <T> Type of entries in the set + * @return peeked entry from the given set + */ + private static <T> T peekFromSet(Set<T> set) { + if (!set.isEmpty()) { + return set.iterator().next(); + } + return null; + } + + /** + * Get id of a partition to offload on disk + * + * @param threadId id of the thread who is going to store the partition on + * disk + * @return id of the partition to offload on disk + */ + public Integer getOffloadPartitionId(int threadId) { + Set<MetaPartition> partitionSet = perThreadPartitions.get(threadId) + .getInMemoryProcessed(); + synchronized (partitionSet) { + MetaPartition meta = peekFromSet(partitionSet); + if (meta != null) { + return meta.getPartitionId(); + } + } + partitionSet = perThreadPartitions.get(threadId).getInMemoryUnprocessed(); + synchronized (partitionSet) { + MetaPartition meta = peekFromSet(partitionSet); + if (meta != null) { + return meta.getPartitionId(); + } + } + return null; + } + + /** + * Get id of a partition to offload its vertex/edge buffers on disk + * + * @param threadId id of the thread who is going to store the buffers on disk + * @return id of the partition to offload its vertex/edge buffers on disk + */ + public Integer getOffloadPartitionBufferId(int threadId) { + if (oocEngine.getServiceWorker().getSuperstep() == + BspServiceWorker.INPUT_SUPERSTEP) { + Integer partitionId = + popFromSet(perThreadVertexEdgeBuffers.get(threadId)); + if (partitionId == null) { + DiskBackedPartitionStore<?, ?, ?> partitionStore = + (DiskBackedPartitionStore<?, ?, ?>) (oocEngine.getServerData() + .getPartitionStore()); + perThreadVertexEdgeBuffers.get(threadId) + .addAll(partitionStore.getCandidateBuffersToOffload()); + DiskBackedEdgeStore<?, ?, ?> edgeStore = + (DiskBackedEdgeStore<?, ?, ?>) (oocEngine.getServerData()) + .getEdgeStore(); + perThreadVertexEdgeBuffers.get(threadId) + .addAll(edgeStore.getCandidateBuffersToOffload()); + partitionId = popFromSet(perThreadVertexEdgeBuffers.get(threadId)); + } + return partitionId; + } + return null; + } + + /** + * Get id of a partition to offload its incoming message buffers on disk + * + * @param threadId id of the thread who is going to store the buffers on disk + * @return id of the partition to offload its message buffer on disk + */ + public Integer getOffloadMessageBufferId(int threadId) { + if (oocEngine.getServiceWorker().getSuperstep() != + BspServiceWorker.INPUT_SUPERSTEP) { + Integer partitionId = + popFromSet(perThreadMessageBuffers.get(threadId)); + if (partitionId == null) { + DiskBackedMessageStore<?, ?> messageStore = + (DiskBackedMessageStore<?, ?>) (oocEngine.getServerData() + .getIncomingMessageStore()); + if (messageStore != null) { + perThreadMessageBuffers.get(threadId) + .addAll(messageStore.getCandidateBuffersToOffload()); + partitionId = popFromSet(perThreadMessageBuffers.get(threadId)); + } + } + return partitionId; + } + return null; + } + + /** + * Get id of a partition to offload its incoming message on disk + * + * @param threadId id of the thread who is going to store the incoming + * messages on disk + * @return id of the partition to offload its message on disk + */ + public Integer getOffloadMessageId(int threadId) { + Set<MetaPartition> partitionSet = perThreadPartitions.get(threadId) + .getInDiskProcessed(); + synchronized (partitionSet) { + for (MetaPartition meta : partitionSet) { + if (meta.getIncomingMessagesState() == StorageState.IN_MEM) { + return meta.getPartitionId(); + } + } + } + partitionSet = perThreadPartitions.get(threadId).getInDiskUnprocessed(); + synchronized (partitionSet) { + for (MetaPartition meta : partitionSet) { + if (meta.getIncomingMessagesState() == StorageState.IN_MEM) { + return meta.getPartitionId(); + } + } + } + return null; + } + + /** + * Get id of a partition to prefetch its data to memory + * + * @param threadId id of the thread who is going to load the partition data + * @return id of the partition to load its data to memory + */ + public Integer getPrefetchPartitionId(int threadId) { + Set<MetaPartition> partitionSet = + perThreadPartitions.get(threadId).getInDiskUnprocessed(); + synchronized (partitionSet) { + MetaPartition meta = peekFromSet(partitionSet); + return (meta != null) ? meta.getPartitionId() : null; + } + } + + /** + * Mark a partition inaccessible to IO and compute threads + * + * @param partitionId id of the partition to mark + */ + public void makePartitionInaccessible(int partitionId) { + MetaPartition meta = partitions.get(partitionId); + perThreadPartitions.get(oocEngine.getIOScheduler() + .getOwnerThreadId(partitionId)) + .remove(meta); + synchronized (meta) { + meta.setProcessingState(ProcessingState.IN_PROCESS); + } + } + + /** + * Mark a partition as 'PROCESSED' + * + * @param partitionId id of the partition to mark + */ + public void setPartitionIsProcessed(int partitionId) { + MetaPartition meta = partitions.get(partitionId); + synchronized (meta) { + meta.setProcessingState(ProcessingState.PROCESSED); + } + Set<MetaPartition> partitionSet = perThreadPartitions + .get(oocEngine.getIOScheduler().getOwnerThreadId(partitionId)) + .getInMemoryProcessed(); + synchronized (partitionSet) { + partitionSet.add(meta); + } + numPartitionsProcessed.getAndIncrement(); + } + + /** + * Notify this meta store that load of a partition for a specific superstep + * is about to start. + * + * @param partitionId id of the partition to load to memory + * @param superstep superstep in which the partition is needed for + * @return true iff load of the given partition is viable + */ + public boolean startLoadingPartition(int partitionId, long superstep) { + MetaPartition meta = partitions.get(partitionId); + synchronized (meta) { + boolean shouldLoad = meta.getPartitionState() == StorageState.ON_DISK; + if (superstep == oocEngine.getServiceWorker().getSuperstep()) { + shouldLoad |= meta.getCurrentMessagesState() == StorageState.ON_DISK; + } else { + shouldLoad |= meta.getIncomingMessagesState() == StorageState.ON_DISK; + } + return shouldLoad; + } + } + + /** + * Notify this meta store that load of a partition for a specific superstep + * is completed + * + * @param partitionId id of a the partition that load is completed + * @param superstep superstep in which the partition is loaded for + */ + public void doneLoadingPartition(int partitionId, long superstep) { + MetaPartition meta = partitions.get(partitionId); + numInMemoryPartitions.getAndIncrement(); + int owner = oocEngine.getIOScheduler().getOwnerThreadId(partitionId); + boolean removed = perThreadPartitions.get(owner) + .remove(meta, StorageState.ON_DISK); + if (removed || meta.getProcessingState() == ProcessingState.IN_PROCESS) { + synchronized (meta) { + meta.setPartitionState(StorageState.IN_MEM); + if (superstep == oocEngine.getServiceWorker().getSuperstep()) { + meta.setCurrentMessagesState(StorageState.IN_MEM); + } else { + meta.setIncomingMessagesState(StorageState.IN_MEM); + } + } + perThreadPartitions.get(owner).add(meta, StorageState.IN_MEM); + } + } + + /** + * Notify this meta store that offload of messages for a particular partition + * is about to start. + * + * @param partitionId id of the partition that its messages is being offloaded + * @return true iff offload of messages of the given partition is viable + */ + public boolean startOffloadingMessages(int partitionId) { + MetaPartition meta = partitions.get(partitionId); + synchronized (meta) { + if (meta.getIncomingMessagesState() == StorageState.IN_MEM) { + meta.setIncomingMessagesState(StorageState.IN_TRANSIT); + return true; + } else { + return false; + } + } + } + + /** + * Notify this meta store that offload of messages for a particular partition + * is complete. + * + * @param partitionId id of the partition that its messages is offloaded to + * disk + */ + public void doneOffloadingMessages(int partitionId) { + MetaPartition meta = partitions.get(partitionId); + synchronized (meta) { + meta.setIncomingMessagesState(StorageState.ON_DISK); + } + } + + /** + * Notify this meta store that offload of raw data buffers (vertex/edges/ + * messages) of a particular partition is about to start. + * + * @param partitionId id of the partition that its buffer is being offloaded + * @return true iff offload of buffers of the given partition is viable + */ + public boolean startOffloadingBuffer(int partitionId) { + // Do nothing + return true; + } + + /** + * Notify this meta store that offload of raw data buffers (vertex/edges/ + * messages) of a particular partition is completed. + * + * @param partitionId id of the partition that its buffer is offloaded + */ + public void doneOffloadingBuffer(int partitionId) { + // Do nothing + } + + /** + * Notify this meta store that offload of a partition (partition data and its + * current messages) is about to start. + * + * @param partitionId id of the partition that its data is being offloaded + * @return true iff offload of the given partition is viable + */ + public boolean startOffloadingPartition(int partitionId) { + MetaPartition meta = partitions.get(partitionId); + int owner = oocEngine.getIOScheduler().getOwnerThreadId(partitionId); + boolean removed = perThreadPartitions.get(owner) + .remove(meta, StorageState.IN_MEM); + if (removed) { + synchronized (meta) { + meta.setPartitionState(StorageState.IN_TRANSIT); + meta.setCurrentMessagesState(StorageState.IN_TRANSIT); + } + perThreadPartitions.get(owner).add(meta, StorageState.IN_TRANSIT); + return true; + } else { + return false; + } + } + + /** + * Notify this meta store that offload of a partition (partition data and its + * current messages) is completed. + * + * @param partitionId id of the partition that its data is offloaded + */ + public void doneOffloadingPartition(int partitionId) { + numInMemoryPartitions.getAndDecrement(); + MetaPartition meta = partitions.get(partitionId); + int owner = oocEngine.getIOScheduler().getOwnerThreadId(partitionId); + boolean removed = perThreadPartitions.get(owner) + .remove(meta, StorageState.IN_TRANSIT); + if (removed) { + synchronized (meta) { + meta.setPartitionState(StorageState.ON_DISK); + meta.setCurrentMessagesState(StorageState.ON_DISK); + } + perThreadPartitions.get(owner).add(meta, StorageState.ON_DISK); + } + } + + /** + * Reset the meta store for a new iteration cycle over all partitions. + * Note: this is not thread-safe and should be called from a single thread. + */ + public void resetPartition() { + for (MetaPartition meta : partitions.values()) { + meta.resetPartition(); + } + int numPartition = 0; + for (PerThreadPartitionStatus status : perThreadPartitions) { + numPartition += status.reset(); + } + checkState(numPartition == partitions.size()); + numPartitionsProcessed.set(0); + } + + /** + * Reset messages in the meta store. + * Note: this is not thread-safe and should be called from a single thread. + */ + public void resetMessages() { + for (MetaPartition meta : partitions.values()) { + meta.resetMessages(); + } + // After swapping incoming messages and current messages, it may be the case + // that a partition has data in memory (partitionState == IN_MEM), but now + // its current messages are on disk (currentMessageState == ON_DISK). So, we + // have to mark the partition as ON_DISK, and load its messages once it is + // about to be processed. + for (PerThreadPartitionStatus status : perThreadPartitions) { + Set<MetaPartition> partitionSet = status.getInMemoryUnprocessed(); + Iterator<MetaPartition> it = partitionSet.iterator(); + while (it.hasNext()) { + MetaPartition meta = it.next(); + if (meta.getCurrentMessagesState() == StorageState.ON_DISK) { + it.remove(); + status.getInDiskUnprocessed().add(meta); + numInMemoryPartitions.getAndDecrement(); + } + } + partitionSet = status.getInMemoryProcessed(); + it = partitionSet.iterator(); + while (it.hasNext()) { + MetaPartition meta = it.next(); + if (meta.getCurrentMessagesState() == StorageState.ON_DISK) { + it.remove(); + status.getInDiskProcessed().add(meta); + numInMemoryPartitions.getAndDecrement(); + } + } + } + } + + /** + * Return the id of an unprocessed partition in memory. If all partitions are + * processed, return an appropriate 'finisher signal'. If there are + * unprocessed partitions, but none are is memory, return null. + * + * @return id of the partition to be processed next. + */ + public Integer getNextPartition() { + if (numPartitionsProcessed.get() >= partitions.size()) { + return NO_PARTITION_TO_PROCESS; + } + int numThreads = perThreadPartitions.size(); + int index = randomGenerator.nextInt(numThreads); + int startIndex = index; + do { + Set<MetaPartition> partitionSet = + perThreadPartitions.get(index).getInMemoryUnprocessed(); + MetaPartition meta; + synchronized (partitionSet) { + meta = popFromSet(partitionSet); + } + if (meta != null) { + synchronized (meta) { + meta.setProcessingState(ProcessingState.IN_PROCESS); + return meta.getPartitionId(); + } + } + index = (index + 1) % numThreads; + } while (index != startIndex); + return null; + } + + /** + * Whether a partition is on disk (both its data and its current messages) + * + * @param partitionId id of the partition to check if it is on disk + * @return true if partition data or its current messages are on disk, false + * otherwise + */ + public boolean isPartitionOnDisk(int partitionId) { + MetaPartition meta = partitions.get(partitionId); + return meta.isOnDisk(); + } + + /** + * Representation of meta information of a partition + */ + private static class MetaPartition { + /** Id of the partition */ + private int partitionId; + /** Storage state of incoming messages */ + private StorageState incomingMessagesState; + /** Storage state of current messages */ + private StorageState currentMessagesState; + /** Storage state of partition data */ + private StorageState partitionState; + /** Processing state of a partition */ + private volatile ProcessingState processingState; + + /** + * Constructor + * + * @param partitionId id of the partition + */ + MetaPartition(int partitionId) { + this.partitionId = partitionId; + this.processingState = ProcessingState.PROCESSED; + this.partitionState = StorageState.IN_MEM; + this.currentMessagesState = StorageState.IN_MEM; + } + + @Override + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append("\nMetaData: {"); + sb.append("ID: " + partitionId + "; "); + sb.append("Partition: " + partitionState + "; "); + sb.append("Current Messages: " + currentMessagesState + "; "); + sb.append("Incoming Messages: " + incomingMessagesState + "; "); + sb.append("Processed? : " + processingState + "}"); + return sb.toString(); + } + + /** + * Get id of the partition + * + * @return id of the partition + */ + public int getPartitionId() { + return partitionId; + } + + /** + * Get storage state of incoming messages of the partition + * + * @return storage state of incoming messages + */ + public StorageState getIncomingMessagesState() { + return incomingMessagesState; + } + + /** + * Set storage state of incoming messages of the partition + * + * @param incomingMessagesState storage state of incoming messages + */ + public void setIncomingMessagesState(StorageState incomingMessagesState) { + this.incomingMessagesState = incomingMessagesState; + } + + /** + * Get storage state of current messages of the partition + * + * @return storage state of current messages + */ + public StorageState getCurrentMessagesState() { + return currentMessagesState; + } + + /** + * Set storage state of current messages of the partition + * + * @param currentMessagesState storage state of current messages + */ + public void setCurrentMessagesState(StorageState currentMessagesState) { + this.currentMessagesState = currentMessagesState; + } + + /** + * Get storage state of the partition + * + * @return storage state of the partition + */ + public StorageState getPartitionState() { + return partitionState; + } + + /** + * Set storage state of the partition + * + * @param state storage state of the partition + */ + public void setPartitionState(StorageState state) { + this.partitionState = state; + } + + public ProcessingState getProcessingState() { + return processingState; + } + + public void setProcessingState(ProcessingState processingState) { + this.processingState = processingState; + } + + /** + * Whether the partition is on disk (either its data or its current + * messages) + * + * @return true if the partition is on disk, false otherwise + */ + public boolean isOnDisk() { + return partitionState == StorageState.ON_DISK || + currentMessagesState == StorageState.ON_DISK; + } + + /** + * Reset the partition meta information for the next iteration cycle + */ + public void resetPartition() { + processingState = ProcessingState.UNPROCESSED; + } + + /** + * Reset messages meta information for the next iteration cycle + */ + public void resetMessages() { + currentMessagesState = incomingMessagesState; + incomingMessagesState = StorageState.IN_MEM; + } + } + + /** + * Representation of partitions' state per IO thread + */ + private static class PerThreadPartitionStatus { + /** + * Contains partitions that has been processed in the current iteration + * cycle, and are not in use by any thread. + */ + private Map<StorageState, Set<MetaPartition>> + processedPartitions = Maps.newConcurrentMap(); + /** + * Contains partitions that has *NOT* been processed in the current + * iteration cycle, and are not in use by any thread. + */ + private Map<StorageState, Set<MetaPartition>> + unprocessedPartitions = Maps.newConcurrentMap(); + + /** + * Constructor + */ + public PerThreadPartitionStatus() { + processedPartitions.put(StorageState.IN_MEM, + Sets.<MetaPartition>newLinkedHashSet()); + processedPartitions.put(StorageState.ON_DISK, + Sets.<MetaPartition>newLinkedHashSet()); + processedPartitions.put(StorageState.IN_TRANSIT, + Sets.<MetaPartition>newLinkedHashSet()); + + unprocessedPartitions.put(StorageState.IN_MEM, + Sets.<MetaPartition>newLinkedHashSet()); + unprocessedPartitions.put(StorageState.ON_DISK, + Sets.<MetaPartition>newLinkedHashSet()); + unprocessedPartitions.put(StorageState.IN_TRANSIT, + Sets.<MetaPartition>newLinkedHashSet()); + } + + @Override + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append("\nProcessed Partitions: " + processedPartitions + "; "); + sb.append("\nUnprocessedPartitions: " + unprocessedPartitions); + return sb.toString(); + } + + /** + * Get set of partitions that are in memory and are processed + * + * @return set of partition that are in memory are are processed + */ + public Set<MetaPartition> getInMemoryProcessed() { + return processedPartitions.get(StorageState.IN_MEM); + } + + /** + * Get set of partitions that are in memory are are not processed + * + * @return set of partitions that are in memory and are not processed + */ + public Set<MetaPartition> getInMemoryUnprocessed() { + return unprocessedPartitions.get(StorageState.IN_MEM); + } + + /** + * Get set of partitions that are on disk and are processed + * + * @return set of partitions that are on disk and are processed + */ + public Set<MetaPartition> getInDiskProcessed() { + return processedPartitions.get(StorageState.ON_DISK); + } + + /** + * Get set of partitions that are on disk and are not processed + * + * @return set of partitions that are on disk and are not processed + */ + public Set<MetaPartition> getInDiskUnprocessed() { + return unprocessedPartitions.get(StorageState.ON_DISK); + } + + /** + * Remove a partition from meta information + * + * @param meta meta-information of a partition to be removed + */ + public void remove(MetaPartition meta) { + Set<MetaPartition> partitionSet; + partitionSet = processedPartitions.get(StorageState.IN_MEM); + synchronized (partitionSet) { + if (partitionSet.remove(meta)) { + return; + } + } + partitionSet = unprocessedPartitions.get(StorageState.IN_MEM); + synchronized (partitionSet) { + if (partitionSet.remove(meta)) { + return; + } + } + partitionSet = processedPartitions.get(StorageState.IN_TRANSIT); + synchronized (partitionSet) { + if (partitionSet.remove(meta)) { + return; + } + } + partitionSet = unprocessedPartitions.get(StorageState.IN_TRANSIT); + synchronized (partitionSet) { + if (partitionSet.remove(meta)) { + return; + } + } + partitionSet = processedPartitions.get(StorageState.ON_DISK); + synchronized (partitionSet) { + if (partitionSet.remove(meta)) { + return; + } + } + partitionSet = unprocessedPartitions.get(StorageState.ON_DISK); + synchronized (partitionSet) { + partitionSet.remove(meta); + } + } + + /** + * Reset meta-information for the next iteration cycle over all partitions + * + * @return total number of partitions kept for this thread + */ + public int reset() { + checkState(unprocessedPartitions.get(StorageState.IN_MEM).size() == 0); + checkState(unprocessedPartitions.get(StorageState.IN_TRANSIT).size() == + 0); + checkState(unprocessedPartitions.get(StorageState.ON_DISK).size() == 0); + unprocessedPartitions.clear(); + unprocessedPartitions.putAll(processedPartitions); + processedPartitions.clear(); + processedPartitions.put(StorageState.IN_MEM, + Sets.<MetaPartition>newLinkedHashSet()); + processedPartitions.put(StorageState.ON_DISK, + Sets.<MetaPartition>newLinkedHashSet()); + processedPartitions.put(StorageState.IN_TRANSIT, + Sets.<MetaPartition>newLinkedHashSet()); + return unprocessedPartitions.get(StorageState.IN_MEM).size() + + unprocessedPartitions.get(StorageState.IN_TRANSIT).size() + + unprocessedPartitions.get(StorageState.ON_DISK).size(); + } + + /** + * Remove a partition from partition set of a given state + * + * @param meta meta partition to remove + * @param state state from which the partition should be removed + * @return true iff the partition is actually removed + */ + public boolean remove(MetaPartition meta, StorageState state) { + boolean removed = false; + Set<MetaPartition> partitionSet = null; + if (meta.getProcessingState() == ProcessingState.UNPROCESSED) { + partitionSet = unprocessedPartitions.get(state); + } else if (meta.getProcessingState() == ProcessingState.PROCESSED) { + partitionSet = processedPartitions.get(state); + } else { + LOG.info("remove: partition " + meta.getPartitionId() + " is " + + "already being processed! This should happen only if partition " + + "removal is done before start of an iteration over all partitions"); + } + if (partitionSet != null) { + synchronized (partitionSet) { + removed = partitionSet.remove(meta); + } + } + return removed; + } + + /** + * Add a partition to partition set of a given state + * + * @param meta meta partition to add + * @param state state to which the partition should be added + */ + public void add(MetaPartition meta, StorageState state) { + Set<MetaPartition> partitionSet = null; + if (meta.getProcessingState() == ProcessingState.UNPROCESSED) { + partitionSet = unprocessedPartitions.get(state); + } else if (meta.getProcessingState() == ProcessingState.PROCESSED) { + partitionSet = processedPartitions.get(state); + } else { + LOG.info("add: partition " + meta.getPartitionId() + " is already " + + "being processed!"); + } + if (partitionSet != null) { + synchronized (partitionSet) { + partitionSet.add(meta); + } + } + } + } +}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/data/OutOfCoreDataManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/OutOfCoreDataManager.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/OutOfCoreDataManager.java new file mode 100644 index 0000000..7d97e51 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/OutOfCoreDataManager.java @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc.data; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.conf.IntConfOption; +import org.apache.log4j.Logger; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; +import static org.apache.giraph.conf.GiraphConstants.ONE_MB; + +/** + * This class provides basic operations for data structures that have to + * participate in out-of-core mechanism. Essential subclasses of this class are: + * - DiskBackedPartitionStore (for partition data) + * - DiskBackedMessageStore (for messages) + * - DiskBackedEdgeStore (for edges read in INPUT_SUPERSTEP) + * Basically, any data structure that may cause OOM to happen can be implemented + * as a subclass of this class. + * + * There are two different terms used in the rest of this class: + * - "data store" refers to in-memory representation of data. Usually this is + * stored per-partition in in-memory implementations of data structures. For + * instance, "data store" of a DiskBackedPartitionStore would collection of + * all partitions kept in the in-memory partition store within the + * DiskBackedPartitionStore. + * - "raw data buffer" refers to raw data which were supposed to be + * de-serialized and added to the data store, but they remain 'as is' in the + * memory because their corresponding partition is offloaded to disk and is + * not available in the data store. + * + * @param <T> raw data format of the data store subclassing this class + */ +public abstract class OutOfCoreDataManager<T> { + /** + * Minimum size of a buffer (in bytes) to flush to disk. This is used to + * decide whether vertex/edge buffers are large enough to flush to disk. + */ + public static final IntConfOption MINIMUM_BUFFER_SIZE_TO_FLUSH = + new IntConfOption("giraph.flushBufferSize", 8 * ONE_MB, + "Minimum size of a buffer (in bytes) to flush to disk."); + + /** Class logger. */ + private static final Logger LOG = Logger.getLogger( + OutOfCoreDataManager.class); + /** Cached value for MINIMUM_BUFFER_SIZE_TO_FLUSH */ + private final int minBufferSizeToOffload; + /** Set containing ids of all out-of-core partitions */ + private final Set<Integer> hasPartitionDataOnDisk = + Sets.newConcurrentHashSet(); + /** + * Map of partition ids to list of raw data buffers. The map will have entries + * only for partitions that their in-memory data structures are currently + * offloaded to disk. We keep the aggregate size of buffers for each partition + * as part of the values in the map to estimate how much memory we can free up + * if we offload data buffers of a particular partition to disk. + */ + private final ConcurrentMap<Integer, Pair<Integer, List<T>>> dataBuffers = + Maps.newConcurrentMap(); + /** + * Map of partition ids to number of raw data buffers offloaded to disk for + * each partition. The map will have entries only for partitions that their + * in-memory data structures are currently out of core. It is necessary to + * know the number of data buffers on disk for a particular partition when we + * are loading all these buffers back in memory. + */ + private final ConcurrentMap<Integer, Integer> numDataBuffersOnDisk = + Maps.newConcurrentMap(); + /** + * Lock to avoid overlapping of read and write on data associated with each + * partition. + * */ + private final ConcurrentMap<Integer, ReadWriteLock> locks = + Maps.newConcurrentMap(); + + /** + * Constructor. + * + * @param conf Configuration + */ + OutOfCoreDataManager(ImmutableClassesGiraphConfiguration conf) { + this.minBufferSizeToOffload = MINIMUM_BUFFER_SIZE_TO_FLUSH.get(conf); + } + + /** + * Retrieves a lock for a given partition. If the lock for the given partition + * does not exist, creates a new lock. + * + * @param partitionId id of the partition the lock is needed for + * @return lock for a given partition + */ + private ReadWriteLock getPartitionLock(int partitionId) { + ReadWriteLock readWriteLock = locks.get(partitionId); + if (readWriteLock == null) { + readWriteLock = new ReentrantReadWriteLock(); + ReadWriteLock temp = locks.putIfAbsent(partitionId, readWriteLock); + if (temp != null) { + readWriteLock = temp; + } + } + return readWriteLock; + } + + /** + * Adds a data entry for a given partition to the current data store. If data + * of a given partition in data store is already offloaded to disk, adds the + * data entry to appropriate raw data buffer list. + * + * @param partitionId id of the partition to add the data entry to + * @param entry data entry to add + */ + protected void addEntry(int partitionId, T entry) { + // Addition of data entries to a data store is much more common than + // out-of-core operations. Besides, in-memory data store implementations + // existing in the code base already account for parallel addition to data + // stores. Therefore, using read lock would optimize for parallel addition + // to data stores, specially for cases where the addition should happen for + // partitions that are entirely in memory. + ReadWriteLock rwLock = getPartitionLock(partitionId); + rwLock.readLock().lock(); + if (hasPartitionDataOnDisk.contains(partitionId)) { + List<T> entryList = new ArrayList<>(); + entryList.add(entry); + int entrySize = entrySerializedSize(entry); + MutablePair<Integer, List<T>> newPair = + new MutablePair<>(entrySize, entryList); + Pair<Integer, List<T>> oldPair = + dataBuffers.putIfAbsent(partitionId, newPair); + if (oldPair != null) { + synchronized (oldPair) { + newPair = (MutablePair<Integer, List<T>>) oldPair; + newPair.setLeft(oldPair.getLeft() + entrySize); + newPair.getRight().add(entry); + } + } + } else { + addEntryToImMemoryPartitionData(partitionId, entry); + } + rwLock.readLock().unlock(); + } + + /** + * Loads and assembles all data for a given partition, and put it into the + * data store. + * + * @param partitionId id of the partition to load ana assemble all data for + * @param basePath path to load the data from + * @throws IOException + */ + public void loadPartitionData(int partitionId, String basePath) + throws IOException { + ReadWriteLock rwLock = getPartitionLock(partitionId); + rwLock.writeLock().lock(); + if (hasPartitionDataOnDisk.contains(partitionId)) { + loadInMemoryPartitionData(partitionId, getPath(basePath, partitionId)); + hasPartitionDataOnDisk.remove(partitionId); + // Loading raw data buffers from disk if there is any and applying those + // to already loaded in-memory data. + Integer numBuffers = numDataBuffersOnDisk.remove(partitionId); + if (numBuffers != null) { + checkState(numBuffers > 0); + File file = new File(getBuffersPath(basePath, partitionId)); + checkState(file.exists()); + if (LOG.isDebugEnabled()) { + LOG.debug("loadPartitionData: loading " + numBuffers + " buffers of" + + " partition " + partitionId + " from " + file.getAbsolutePath()); + } + FileInputStream fis = new FileInputStream(file); + BufferedInputStream bis = new BufferedInputStream(fis); + DataInputStream dis = new DataInputStream(bis); + for (int i = 0; i < numBuffers; ++i) { + T entry = readNextEntry(dis); + addEntryToImMemoryPartitionData(partitionId, entry); + } + dis.close(); + checkState(file.delete(), "loadPartitionData: failed to delete %s.", + file.getAbsoluteFile()); + } + // Applying in-memory raw data buffers to in-memory partition data. + Pair<Integer, List<T>> pair = dataBuffers.remove(partitionId); + if (pair != null) { + for (T entry : pair.getValue()) { + addEntryToImMemoryPartitionData(partitionId, entry); + } + } + } + rwLock.writeLock().unlock(); + } + + /** + * Offloads partition data of a given partition in the data store to disk + * + * @param partitionId id of the partition to offload its data + * @param basePath path to offload the data to + * @throws IOException + */ + @edu.umd.cs.findbugs.annotations.SuppressWarnings( + "UL_UNRELEASED_LOCK_EXCEPTION_PATH") + public void offloadPartitionData(int partitionId, String basePath) + throws IOException { + ReadWriteLock rwLock = getPartitionLock(partitionId); + rwLock.writeLock().lock(); + hasPartitionDataOnDisk.add(partitionId); + rwLock.writeLock().unlock(); + offloadInMemoryPartitionData(partitionId, getPath(basePath, partitionId)); + } + + /** + * Offloads raw data buffers of a given partition to disk + * + * @param partitionId id of the partition to offload its raw data buffers + * @param basePath path to offload the data to + * @throws IOException + */ + public void offloadBuffers(int partitionId, String basePath) + throws IOException { + Pair<Integer, List<T>> pair = dataBuffers.get(partitionId); + if (pair == null || pair.getLeft() < minBufferSizeToOffload) { + return; + } + ReadWriteLock rwLock = getPartitionLock(partitionId); + rwLock.writeLock().lock(); + pair = dataBuffers.remove(partitionId); + rwLock.writeLock().unlock(); + checkNotNull(pair); + checkState(!pair.getRight().isEmpty()); + File file = new File(getBuffersPath(basePath, partitionId)); + FileOutputStream fos = new FileOutputStream(file, true); + BufferedOutputStream bos = new BufferedOutputStream(fos); + DataOutputStream dos = new DataOutputStream(bos); + for (T entry : pair.getRight()) { + writeEntry(entry, dos); + } + dos.close(); + int numBuffers = pair.getRight().size(); + Integer oldNumBuffersOnDisk = + numDataBuffersOnDisk.putIfAbsent(partitionId, numBuffers); + if (oldNumBuffersOnDisk != null) { + numDataBuffersOnDisk.replace(partitionId, + oldNumBuffersOnDisk + numBuffers); + } + } + + /** + * Looks through all partitions that their data is not in the data store (is + * offloaded to disk), and sees if any of them has enough raw data buffer in + * memory. If so, puts that partition in a list to return. + * + * @return Set of partition ids of all partition raw buffers where the + * aggregate size of buffers are large enough and it is worth flushing + * those buffers to disk + */ + public Set<Integer> getCandidateBuffersToOffload() { + Set<Integer> result = new HashSet<>(); + for (Map.Entry<Integer, Pair<Integer, List<T>>> entry : + dataBuffers.entrySet()) { + if (entry.getValue().getLeft() > minBufferSizeToOffload) { + result.add(entry.getKey()); + } + } + return result; + } + + /** + * Creates the path to read/write partition data from/to for a given + * partition. + * + * @param basePath path prefix to create the actual path from + * @param partitionId id of the partition + * @return path to read/write data from/to + */ + private static String getPath(String basePath, int partitionId) { + return basePath + "-P" + partitionId; + } + + /** + * Creates the path to read/write raw data buffers of a given partition + * from/to. + * + * @param basePath path prefix to create the actual path from + * @param partitionId id of the partition + * @return path to read/write raw data buffer to/from + */ + private static String getBuffersPath(String basePath, int partitionId) { + return getPath(basePath, partitionId) + "_buffers"; + } + + /** + * Writes a single raw entry to a given output stream. + * + * @param entry entry to write to output + * @param out output stream to write the entry to + * @throws IOException + */ + protected abstract void writeEntry(T entry, DataOutput out) + throws IOException; + + /** + * Reads the next available raw entry from a given input stream. + * + * @param in input stream to read the entry from + * @return entry read from an input stream + * @throws IOException + */ + protected abstract T readNextEntry(DataInput in) throws IOException; + + /** + * Loads data of a partition into data store. + * + * @param partitionId id of the partition to load its data + * @param path path from which data should be loaded + * @throws IOException + */ + protected abstract void loadInMemoryPartitionData(int partitionId, + String path) + throws IOException; + + /** + * Offloads data of a partition in data store to disk. + * + * @param partitionId id of the partition to offload to disk + * @param path path to which data should be offloaded + * @throws IOException + */ + protected abstract void offloadInMemoryPartitionData(int partitionId, + String path) + throws IOException; + + /** + * Gets the size of a given entry in bytes. + * + * @param entry input entry to find its size + * @return size of given input entry in bytes + */ + protected abstract int entrySerializedSize(T entry); + + /** + * Adds a single entry for a given partition to the in-memory data store. + * + * @param partitionId id of the partition to add the data to + * @param entry input entry to add to the data store + */ + protected abstract void addEntryToImMemoryPartitionData(int partitionId, + T entry); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/data/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/package-info.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/package-info.java new file mode 100644 index 0000000..5260f2c --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Package of classes related to data structures used for an out-of-core + * mechanism + */ +package org.apache.giraph.ooc.data; http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/io/IOCommand.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/IOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/IOCommand.java new file mode 100644 index 0000000..eb6d2c9 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/io/IOCommand.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc.io; + +import org.apache.giraph.ooc.OutOfCoreEngine; + +import java.io.IOException; + +/** + * Representation of an IO command (moving data to disk/memory) used in + * out-of-core mechanism. + */ +public abstract class IOCommand { + /** Id of the partition involved for the IO */ + protected final int partitionId; + /** Out-of-core engine */ + protected final OutOfCoreEngine oocEngine; + + /** + * Constructor + * + * @param oocEngine Out-of-core engine + * @param partitionId Id of the partition involved in the IO + */ + public IOCommand(OutOfCoreEngine oocEngine, int partitionId) { + this.oocEngine = oocEngine; + this.partitionId = partitionId; + } + + /** + * GEt the id of the partition involved in the IO + * + * @return id of the partition + */ + public int getPartitionId() { + return partitionId; + } + + /** + * Execute (load/store of data) the IO command, and change the data stores + * appropriately based on the data loaded/stored. + * + * @param basePath the base path (prefix) to the files/folders IO command + * should read/write data from/to + * @throws IOException + */ + public abstract void execute(String basePath) throws IOException; +} + http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/io/LoadPartitionIOCommand.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/LoadPartitionIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/LoadPartitionIOCommand.java new file mode 100644 index 0000000..c28a0da --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/io/LoadPartitionIOCommand.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc.io; + +import com.google.common.base.Preconditions; +import org.apache.giraph.bsp.BspService; +import org.apache.giraph.comm.messages.MessageStore; +import org.apache.giraph.ooc.OutOfCoreEngine; +import org.apache.giraph.ooc.data.DiskBackedEdgeStore; +import org.apache.giraph.ooc.data.DiskBackedMessageStore; +import org.apache.giraph.ooc.data.DiskBackedPartitionStore; + +import java.io.IOException; + +/** + * IOCommand to load partition data, edge data (if in INPUT_SUPERSTEP), and + * message data (if in compute supersteps). Also, this command can be used to + * prefetch a partition to be processed in the next superstep. + */ +public class LoadPartitionIOCommand extends IOCommand { + /** + * Which superstep this partition should be loaded for? (can be current + * superstep or next superstep -- in case of prefetching). + */ + private final long superstep; + + /** + * Constructor + * + * @param oocEngine out-of-core engine + * @param partitionId id of the partition to be loaded + * @param superstep superstep to load the partition for + */ + public LoadPartitionIOCommand(OutOfCoreEngine oocEngine, int partitionId, + long superstep) { + super(oocEngine, partitionId); + this.superstep = superstep; + } + + @Override + public void execute(String basePath) throws IOException { + if (oocEngine.getMetaPartitionManager() + .startLoadingPartition(partitionId, superstep)) { + long currentSuperstep = oocEngine.getServiceWorker().getSuperstep(); + DiskBackedPartitionStore partitionStore = + (DiskBackedPartitionStore) + oocEngine.getServerData().getPartitionStore(); + partitionStore.loadPartitionData(partitionId, basePath); + if (currentSuperstep == BspService.INPUT_SUPERSTEP && + superstep == currentSuperstep) { + DiskBackedEdgeStore edgeStore = + (DiskBackedEdgeStore) oocEngine.getServerData().getEdgeStore(); + edgeStore.loadPartitionData(partitionId, basePath); + } + MessageStore messageStore; + if (currentSuperstep == superstep) { + messageStore = oocEngine.getServerData().getCurrentMessageStore(); + } else { + Preconditions.checkState(superstep == currentSuperstep + 1); + messageStore = oocEngine.getServerData().getIncomingMessageStore(); + } + if (messageStore != null) { + ((DiskBackedMessageStore) messageStore) + .loadPartitionData(partitionId, basePath); + } + oocEngine.getMetaPartitionManager() + .doneLoadingPartition(partitionId, superstep); + } + } + + @Override + public String toString() { + return "LoadPartitionIOCommand: (partitionId = " + partitionId + ", " + + "superstep = " + superstep + ")"; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreDataBufferIOCommand.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreDataBufferIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreDataBufferIOCommand.java new file mode 100644 index 0000000..41a0682 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreDataBufferIOCommand.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc.io; + +import org.apache.giraph.ooc.OutOfCoreEngine; +import org.apache.giraph.ooc.data.DiskBackedEdgeStore; +import org.apache.giraph.ooc.data.DiskBackedMessageStore; +import org.apache.giraph.ooc.data.DiskBackedPartitionStore; + +import java.io.IOException; + +/** + * IOCommand to store raw data buffers on disk. + */ +public class StoreDataBufferIOCommand extends IOCommand { + /** + * Types of raw data buffer to offload to disk (either vertices/edges buffer + * in INPUT_SUPERSTEP or incoming message buffer). + */ + public enum DataBufferType { PARTITION, MESSAGE }; + /** + * Type of the buffer to store on disk. + */ + private final DataBufferType type; + + /** + * Constructor + * + * @param oocEngine out-of-core engine + * @param partitionId id of the partition to offload its buffers + * @param type type of the buffer to store on disk + */ + public StoreDataBufferIOCommand(OutOfCoreEngine oocEngine, + int partitionId, + DataBufferType type) { + super(oocEngine, partitionId); + this.type = type; + } + + @Override + public void execute(String basePath) throws IOException { + if (oocEngine.getMetaPartitionManager() + .startOffloadingBuffer(partitionId)) { + switch (type) { + case PARTITION: + DiskBackedPartitionStore partitionStore = + (DiskBackedPartitionStore) + oocEngine.getServerData().getPartitionStore(); + partitionStore.offloadBuffers(partitionId, basePath); + DiskBackedEdgeStore edgeStore = + (DiskBackedEdgeStore) oocEngine.getServerData().getEdgeStore(); + edgeStore.offloadBuffers(partitionId, basePath); + break; + case MESSAGE: + DiskBackedMessageStore messageStore = + (DiskBackedMessageStore) + oocEngine.getServerData().getIncomingMessageStore(); + messageStore.offloadBuffers(partitionId, basePath); + break; + default: + throw new IllegalStateException("execute: requested data buffer type " + + "does not exist!"); + } + oocEngine.getMetaPartitionManager().doneOffloadingBuffer(partitionId); + } + } + + @Override + public String toString() { + return "StoreDataBufferIOCommand: (partitionId = " + partitionId + ", " + + "type = " + type.name() + ")"; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreIncomingMessageIOCommand.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreIncomingMessageIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreIncomingMessageIOCommand.java new file mode 100644 index 0000000..9c1c0a2 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/io/StoreIncomingMessageIOCommand.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc.io; + +import org.apache.giraph.ooc.OutOfCoreEngine; +import org.apache.giraph.ooc.data.DiskBackedMessageStore; + +import java.io.IOException; + +import static com.google.common.base.Preconditions.checkState; + +/** + * IOCommand to store incoming message of a particular partition. + */ +public class StoreIncomingMessageIOCommand extends IOCommand { + /** + * Constructor + * + * @param oocEngine out-of-core engine + * @param partitionId id of the partition to store its incoming messages + */ + public StoreIncomingMessageIOCommand(OutOfCoreEngine oocEngine, + int partitionId) { + super(oocEngine, partitionId); + } + + @Override + public void execute(String basePath) throws IOException { + if (oocEngine.getMetaPartitionManager() + .startOffloadingMessages(partitionId)) { + DiskBackedMessageStore messageStore = + (DiskBackedMessageStore) + oocEngine.getServerData().getIncomingMessageStore(); + checkState(messageStore != null); + messageStore.offloadPartitionData(partitionId, basePath); + oocEngine.getMetaPartitionManager().doneOffloadingMessages(partitionId); + } + } + + @Override + public String toString() { + return "StoreIncomingMessageIOCommand: (partitionId = " + partitionId + ")"; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/io/StorePartitionIOCommand.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/StorePartitionIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/StorePartitionIOCommand.java new file mode 100644 index 0000000..77955dc --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/io/StorePartitionIOCommand.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc.io; + +import org.apache.giraph.bsp.BspService; +import org.apache.giraph.comm.messages.MessageStore; +import org.apache.giraph.ooc.data.DiskBackedEdgeStore; +import org.apache.giraph.ooc.data.DiskBackedMessageStore; +import org.apache.giraph.ooc.data.DiskBackedPartitionStore; +import org.apache.giraph.ooc.OutOfCoreEngine; + +import java.io.IOException; + +/** + * IOCommand to store partition data, edge data (if in INPUT_SUPERSTEP), and + * message data (if in compute supersteps). + */ +public class StorePartitionIOCommand extends IOCommand { + /** + * Constructor + * + * @param oocEngine out-of-core engine + * @param partitionId id of the partition to store its data + */ + public StorePartitionIOCommand(OutOfCoreEngine oocEngine, + int partitionId) { + super(oocEngine, partitionId); + } + + @Override + public void execute(String basePath) throws IOException { + if (oocEngine.getMetaPartitionManager() + .startOffloadingPartition(partitionId)) { + DiskBackedPartitionStore partitionStore = + (DiskBackedPartitionStore) + oocEngine.getServerData().getPartitionStore(); + partitionStore.offloadPartitionData(partitionId, basePath); + if (oocEngine.getServiceWorker().getSuperstep() != + BspService.INPUT_SUPERSTEP) { + MessageStore messageStore = + oocEngine.getServerData().getCurrentMessageStore(); + if (messageStore != null) { + ((DiskBackedMessageStore) messageStore) + .offloadPartitionData(partitionId, basePath); + } + } else { + DiskBackedEdgeStore edgeStore = + (DiskBackedEdgeStore) + oocEngine.getServerData().getEdgeStore(); + edgeStore.offloadPartitionData(partitionId, basePath); + } + oocEngine.getMetaPartitionManager().doneOffloadingPartition(partitionId); + } + } + + @Override + public String toString() { + return "StorePartitionIOCommand: (partitionId = " + partitionId + ")"; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/io/WaitIOCommand.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/WaitIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/WaitIOCommand.java new file mode 100644 index 0000000..b6e0546 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/io/WaitIOCommand.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc.io; + +import org.apache.giraph.ooc.OutOfCoreEngine; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** + * IOCommand to do nothing regarding moving data to/from disk. + */ +public class WaitIOCommand extends IOCommand { + /** How long should the disk be idle? (in milliseconds) */ + private final long waitDuration; + + /** + * Constructor + * + * @param oocEngine out-of-core engine + * @param waitDuration duration of wait + */ + public WaitIOCommand(OutOfCoreEngine oocEngine, long waitDuration) { + super(oocEngine, -1); + this.waitDuration = waitDuration; + } + + @Override + public void execute(String basePath) throws IOException { + try { + TimeUnit.MILLISECONDS.sleep(waitDuration); + } catch (InterruptedException e) { + throw new IllegalStateException("execute: caught InterruptedException " + + "while IO thread is waiting!"); + } + } + + @Override + public String toString() { + return "WaitIOCommand: (duration = " + waitDuration + "ms)"; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/io/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/package-info.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/package-info.java new file mode 100644 index 0000000..2230ec4 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/io/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Package of classes related to IO operations in out-of-core mechanism + */ +package org.apache.giraph.ooc.io; http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/partition/PartitionData.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionData.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionData.java deleted file mode 100644 index d6e3a70..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionData.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.partition; - -import org.apache.giraph.utils.ExtendedDataOutput; -import org.apache.giraph.utils.VertexIdEdges; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -/** - * Structure that keeps partition information. - * - * @param <I> Vertex id - * @param <V> Vertex value - * @param <E> Edge value - */ -public interface PartitionData<I extends WritableComparable, - V extends Writable, E extends Writable> { - /** - * Add a *new* partition to the store. If the partition is already existed, - * it does not add the partition and returns false. - * - * @param partition Partition to add - * @return Whether the addition made any change in the partition store - */ - boolean addPartition(Partition<I, V, E> partition); - - /** - * Remove a partition and return it. Called from a single thread, *not* from - * within a scheduling cycle, and after INPUT_SUPERSTEP is complete. - * - * @param partitionId Partition id - * @return The removed partition - */ - Partition<I, V, E> removePartition(Integer partitionId); - - /** - * Whether a specific partition is present in the store. - * - * @param partitionId Partition id - * @return True iff the partition is present - */ - boolean hasPartition(Integer partitionId); - - /** - * Return the ids of all the stored partitions as an Iterable. - * - * @return The partition ids - */ - Iterable<Integer> getPartitionIds(); - - /** - * Return the number of stored partitions. - * - * @return The number of partitions - */ - int getNumPartitions(); - - /** - * Return the number of vertices in a partition. - * - * @param partitionId Partition id - * @return The number of vertices in the specified partition - */ - long getPartitionVertexCount(Integer partitionId); - - /** - * Return the number of edges in a partition. - * - * @param partitionId Partition id - * @return The number of edges in the specified partition - */ - long getPartitionEdgeCount(Integer partitionId); - - /** - * Whether the partition store is empty. - * - * @return True iff there are no partitions in the store - */ - boolean isEmpty(); - - /** - * Add vertices to a given partition from a given DataOutput instance. This - * method is called right after receipt of vertex request in INPUT_SUPERSTEP. - * - * @param partitionId Partition id - * @param extendedDataOutput Output containing serialized vertex data - */ - void addPartitionVertices(Integer partitionId, - ExtendedDataOutput extendedDataOutput); - - /** - * Add edges to a given partition from a given send edge request. This - * method is called right after receipt of edge request in INPUT_SUPERSTEP. - * - * @param partitionId Partition id - * @param edges Edges in the request - */ - void addPartitionEdges(Integer partitionId, VertexIdEdges<I, E> edges); -} http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java index 2facff8..5ff15eb 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java @@ -18,152 +18,106 @@ package org.apache.giraph.partition; -import org.apache.giraph.bsp.CentralizedServiceWorker; -import org.apache.giraph.comm.messages.MessageData; -import org.apache.giraph.comm.messages.MessageStore; -import org.apache.giraph.comm.messages.MessageStoreFactory; -import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.edge.EdgeStore; -import org.apache.giraph.edge.EdgeStoreFactory; -import org.apache.giraph.utils.ReflectionUtils; +import org.apache.giraph.utils.ExtendedDataOutput; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapreduce.Mapper; - -import java.io.IOException; - -import static org.apache.giraph.conf.GiraphConstants.MESSAGE_STORE_FACTORY_CLASS; /** * Structure that stores partitions for a worker. PartitionStore does not allow * random accesses to partitions except upon removal. - * This structure is thread-safe. + * This structure is thread-safe, unless otherwise specified. * * @param <I> Vertex id * @param <V> Vertex data * @param <E> Edge data */ -public abstract class PartitionStore<I extends WritableComparable, - V extends Writable, E extends Writable> - implements PartitionData<I, V, E>, MessageData<I> { - /** Configuration. */ - protected final ImmutableClassesGiraphConfiguration<I, V, E> conf; - /** Context used to report progress */ - protected final Mapper<?, ?, ?, ?>.Context context; - /** service worker reference */ - protected final CentralizedServiceWorker<I, V, E> serviceWorker; - - /** Edge store for this worker */ - protected final EdgeStore<I, V, E> edgeStore; - - /** Message store factory */ - protected MessageStoreFactory<I, Writable, MessageStore<I, Writable>> - messageStoreFactory; +public interface PartitionStore<I extends WritableComparable, + V extends Writable, E extends Writable> { + /** + * Add a *new* partition to the store. If the partition is already existed, + * it does not add the partition and returns false. + * Note: this method is not thread-safe and should be called by a single + * thread. + * + * @param partition Partition to add + * @return Whether the addition made any change in the partition store + */ + boolean addPartition(Partition<I, V, E> partition); + + /** + * Remove a partition and return it. Called from a single thread, *not* from + * within a scheduling cycle. This method should *not* be called in + * INPUT_SUPERSTEP. + * + * @param partitionId Partition id + * @return The removed partition + */ + Partition<I, V, E> removePartition(Integer partitionId); + /** - * Message store for incoming messages (messages which will be consumed - * in the next super step) + * Whether a specific partition is present in the store. + * + * @param partitionId Partition id + * @return True iff the partition is present */ - protected volatile MessageStore<I, Writable> incomingMessageStore; + boolean hasPartition(Integer partitionId); + /** - * Message store for current messages (messages which we received in - * previous super step and which will be consumed in current super step) + * Return the ids of all the stored partitions as an Iterable. + * + * @return The partition ids */ - protected volatile MessageStore<I, Writable> currentMessageStore; + Iterable<Integer> getPartitionIds(); /** - * Constructor for abstract partition store + * Return the number of stored partitions. * - * @param conf Job configuration - * @param context Mapper context - * @param serviceWorker Worker service + * @return The number of partitions */ - public PartitionStore(ImmutableClassesGiraphConfiguration<I, V, E> conf, - Mapper<?, ?, ?, ?>.Context context, - CentralizedServiceWorker<I, V, E> serviceWorker) { - this.conf = conf; - this.context = context; - this.serviceWorker = serviceWorker; - this.messageStoreFactory = createMessageStoreFactory(); - EdgeStoreFactory<I, V, E> edgeStoreFactory = conf.createEdgeStoreFactory(); - edgeStoreFactory.initialize(serviceWorker, conf, context); - this.edgeStore = edgeStoreFactory.newStore(); - } + int getNumPartitions(); /** - * Decide which message store should be used for current application, - * and create the factory for that store + * Return the number of vertices in a partition. * - * @return Message store factory + * @param partitionId Partition id + * @return The number of vertices in the specified partition */ - private MessageStoreFactory<I, Writable, MessageStore<I, Writable>> - createMessageStoreFactory() { - Class<? extends MessageStoreFactory> messageStoreFactoryClass = - MESSAGE_STORE_FACTORY_CLASS.get(conf); - - MessageStoreFactory messageStoreFactoryInstance = - ReflectionUtils.newInstance(messageStoreFactoryClass); - messageStoreFactoryInstance.initialize(serviceWorker, conf); - - return messageStoreFactoryInstance; - } - - @Override - public boolean isEmpty() { - return getNumPartitions() == 0; - } - - @Override - public <M extends Writable> MessageStore<I, M> getIncomingMessageStore() { - return (MessageStore<I, M>) incomingMessageStore; - } - - @Override - public <M extends Writable> MessageStore<I, M> getCurrentMessageStore() { - return (MessageStore<I, M>) currentMessageStore; - } - - @Override - public void resetMessageStores() throws IOException { - if (currentMessageStore != null) { - currentMessageStore.clearAll(); - currentMessageStore = null; - } - if (incomingMessageStore != null) { - incomingMessageStore.clearAll(); - incomingMessageStore = null; - } - prepareSuperstep(); - } - - /** Prepare for next super step */ - public void prepareSuperstep() { - if (currentMessageStore != null) { - try { - currentMessageStore.clearAll(); - } catch (IOException e) { - throw new IllegalStateException( - "Failed to clear previous message store"); - } - } - currentMessageStore = incomingMessageStore != null ? - incomingMessageStore : - messageStoreFactory.newStore(conf.getIncomingMessageClasses()); - incomingMessageStore = - messageStoreFactory.newStore(conf.getOutgoingMessageClasses()); - // finalize current message-store before resolving mutations - currentMessageStore.finalizeStore(); - } + long getPartitionVertexCount(Integer partitionId); + + /** + * Return the number of edges in a partition. + * + * @param partitionId Partition id + * @return The number of edges in the specified partition + */ + long getPartitionEdgeCount(Integer partitionId); + + /** + * Whether the partition store is empty. + * + * @return True iff there are no partitions in the store + */ + boolean isEmpty(); + + /** + * Add vertices to a given partition from a given DataOutput instance. This + * method is called right after receipt of vertex request in INPUT_SUPERSTEP. + * + * @param partitionId Partition id + * @param extendedDataOutput Output containing serialized vertex data + */ + void addPartitionVertices(Integer partitionId, + ExtendedDataOutput extendedDataOutput); /** * Called at the end of the computation. Called from a single thread. */ - public void shutdown() { } + void shutdown(); /** * Called at the beginning of the computation. Called from a single thread. */ - public void initialize() { } + void initialize(); /** * Start the iteration cycle to iterate over partitions. Note that each @@ -194,7 +148,7 @@ public abstract class PartitionStore<I extends WritableComparable, * * Called from a single thread. */ - public abstract void startIteration(); + void startIteration(); /** * Return the next partition in iteration for the current superstep. @@ -204,7 +158,7 @@ public abstract class PartitionStore<I extends WritableComparable, * * @return The next partition to process */ - public abstract Partition<I, V, E> getNextPartition(); + Partition<I, V, E> getNextPartition(); /** * Put a partition back to the store. Use this method to put a partition @@ -213,21 +167,5 @@ public abstract class PartitionStore<I extends WritableComparable, * * @param partition Partition */ - public abstract void putPartition(Partition<I, V, E> partition); - - /** - * Move edges from edge store to partitions. This method is called from a - * *single thread* once all vertices and edges are read in INPUT_SUPERSTEP. - */ - public abstract void moveEdgesToVertices(); - - /** - * In case of async message store we have to wait for all messages - * to be processed before going into next superstep. - */ - public void waitForComplete() { - if (incomingMessageStore instanceof AsyncMessageStoreWrapper) { - ((AsyncMessageStoreWrapper) incomingMessageStore).waitToComplete(); - } - } + void putPartition(Partition<I, V, E> partition); } http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java index 9f0c408..4846702 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java @@ -19,18 +19,13 @@ package org.apache.giraph.partition; import com.google.common.collect.Maps; -import org.apache.giraph.bsp.CentralizedServiceWorker; -import org.apache.giraph.comm.messages.MessageStore; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.utils.ExtendedDataOutput; -import org.apache.giraph.utils.VertexIdEdges; -import org.apache.giraph.utils.VertexIdMessages; import org.apache.giraph.utils.VertexIterator; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; -import java.io.IOException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentMap; @@ -46,7 +41,11 @@ import static com.google.common.base.Preconditions.checkState; */ public class SimplePartitionStore<I extends WritableComparable, V extends Writable, E extends Writable> - extends PartitionStore<I, V, E> { + implements PartitionStore<I, V, E> { + /** Configuration */ + private final ImmutableClassesGiraphConfiguration<I, V, E> conf; + /** Job context (for progress) */ + private final Mapper<?, ?, ?, ?>.Context context; /** Map of stored partitions. */ private final ConcurrentMap<Integer, Partition<I, V, E>> partitions = Maps.newConcurrentMap(); @@ -57,12 +56,11 @@ public class SimplePartitionStore<I extends WritableComparable, * Constructor. * @param conf Configuration * @param context Mapper context - * @param serviceWorker Service worker */ public SimplePartitionStore(ImmutableClassesGiraphConfiguration<I, V, E> conf, - Mapper<?, ?, ?, ?>.Context context, - CentralizedServiceWorker<I, V, E> serviceWorker) { - super(conf, context, serviceWorker); + Mapper<?, ?, ?, ?>.Context context) { + this.conf = conf; + this.context = context; } @Override @@ -111,6 +109,11 @@ public class SimplePartitionStore<I extends WritableComparable, } @Override + public boolean isEmpty() { + return partitions.size() == 0; + } + + @Override public void startIteration() { checkState(partitionQueue == null || partitionQueue.isEmpty(), "startIteration: It seems that some of " + @@ -162,27 +165,8 @@ public class SimplePartitionStore<I extends WritableComparable, } @Override - public void addPartitionEdges(Integer partitionId, - VertexIdEdges<I, E> edges) { - edgeStore.addPartitionEdges(partitionId, edges); - } + public void shutdown() { } @Override - public void moveEdgesToVertices() { - edgeStore.moveEdgesToVertices(); - } - - @Override - public <M extends Writable> void addPartitionCurrentMessages( - int partitionId, VertexIdMessages<I, M> messages) throws IOException { - ((MessageStore<I, M>) currentMessageStore) - .addPartitionMessages(partitionId, messages); - } - - @Override - public <M extends Writable> void addPartitionIncomingMessages( - int partitionId, VertexIdMessages<I, M> messages) throws IOException { - ((MessageStore<I, M>) incomingMessageStore) - .addPartitionMessages(partitionId, messages); - } + public void initialize() { } }
