http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/command/StoreIncomingMessageIOCommand.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/command/StoreIncomingMessageIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/command/StoreIncomingMessageIOCommand.java new file mode 100644 index 0000000..b38f957 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/command/StoreIncomingMessageIOCommand.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc.command; + +import org.apache.giraph.ooc.OutOfCoreEngine; +import org.apache.giraph.ooc.data.DiskBackedMessageStore; + +import java.io.IOException; + +import static com.google.common.base.Preconditions.checkState; + +/** + * IOCommand to store incoming message of a particular partition. + */ +public class StoreIncomingMessageIOCommand extends IOCommand { + /** + * Constructor + * + * @param oocEngine out-of-core engine + * @param partitionId id of the partition to store its incoming messages + */ + public StoreIncomingMessageIOCommand(OutOfCoreEngine oocEngine, + int partitionId) { + super(oocEngine, partitionId); + } + + @Override + public boolean execute() throws IOException { + boolean executed = false; + if (oocEngine.getMetaPartitionManager() + .startOffloadingMessages(partitionId)) { + DiskBackedMessageStore messageStore = + (DiskBackedMessageStore) + oocEngine.getServerData().getIncomingMessageStore(); + checkState(messageStore != null); + numBytesTransferred += + messageStore.offloadPartitionData(partitionId); + oocEngine.getMetaPartitionManager().doneOffloadingMessages(partitionId); + executed = true; + } + return executed; + } + + @Override + public IOCommandType getType() { + return IOCommandType.STORE_MESSAGE; + } + + @Override + public String toString() { + return "StoreIncomingMessageIOCommand: (partitionId = " + partitionId + ")"; + } +}
http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/command/StorePartitionIOCommand.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/command/StorePartitionIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/command/StorePartitionIOCommand.java new file mode 100644 index 0000000..31fa345 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/command/StorePartitionIOCommand.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc.command; + +import org.apache.giraph.bsp.BspService; +import org.apache.giraph.comm.messages.MessageStore; +import org.apache.giraph.ooc.data.DiskBackedEdgeStore; +import org.apache.giraph.ooc.data.DiskBackedMessageStore; +import org.apache.giraph.ooc.data.DiskBackedPartitionStore; +import org.apache.giraph.ooc.OutOfCoreEngine; + +import java.io.IOException; + +/** + * IOCommand to store partition data, edge data (if in INPUT_SUPERSTEP), and + * message data (if in compute supersteps). + */ +public class StorePartitionIOCommand extends IOCommand { + /** + * Constructor + * + * @param oocEngine out-of-core engine + * @param partitionId id of the partition to store its data + */ + public StorePartitionIOCommand(OutOfCoreEngine oocEngine, + int partitionId) { + super(oocEngine, partitionId); + } + + @Override + public boolean execute() throws IOException { + boolean executed = false; + if (oocEngine.getMetaPartitionManager() + .startOffloadingPartition(partitionId)) { + DiskBackedPartitionStore partitionStore = + (DiskBackedPartitionStore) + oocEngine.getServerData().getPartitionStore(); + numBytesTransferred += + partitionStore.offloadPartitionData(partitionId); + if (oocEngine.getSuperstep() != BspService.INPUT_SUPERSTEP) { + MessageStore messageStore = + oocEngine.getServerData().getCurrentMessageStore(); + if (messageStore != null) { + numBytesTransferred += ((DiskBackedMessageStore) messageStore) + .offloadPartitionData(partitionId); + } + } else { + DiskBackedEdgeStore edgeStore = + (DiskBackedEdgeStore) + oocEngine.getServerData().getEdgeStore(); + numBytesTransferred += + edgeStore.offloadPartitionData(partitionId); + } + oocEngine.getMetaPartitionManager().doneOffloadingPartition(partitionId); + executed = true; + } + return executed; + } + + @Override + public IOCommandType getType() { + return IOCommandType.STORE_PARTITION; + } + + @Override + public String toString() { + return "StorePartitionIOCommand: (partitionId = " + partitionId + ")"; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/command/WaitIOCommand.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/command/WaitIOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/command/WaitIOCommand.java new file mode 100644 index 0000000..83540c1 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/command/WaitIOCommand.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc.command; + +import org.apache.giraph.ooc.OutOfCoreEngine; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** + * IOCommand to do nothing regarding moving data to/from disk. + */ +public class WaitIOCommand extends IOCommand { + /** How long should the disk be idle? (in milliseconds) */ + private final long waitDuration; + + /** + * Constructor + * + * @param oocEngine out-of-core engine + * @param waitDuration duration of wait + */ + public WaitIOCommand(OutOfCoreEngine oocEngine, long waitDuration) { + super(oocEngine, -1); + this.waitDuration = waitDuration; + } + + @Override + public boolean execute() throws IOException { + try { + TimeUnit.MILLISECONDS.sleep(waitDuration); + } catch (InterruptedException e) { + throw new IllegalStateException("execute: caught InterruptedException " + + "while IO thread is waiting!"); + } + return true; + } + + @Override + public IOCommandType getType() { + return IOCommandType.WAIT; + } + + @Override + public String toString() { + return "WaitIOCommand: (duration = " + waitDuration + "ms)"; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/command/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/command/package-info.java b/giraph-core/src/main/java/org/apache/giraph/ooc/command/package-info.java new file mode 100644 index 0000000..930b139 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/command/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Package of classes related to IO commands in out-of-core mechanism + */ +package org.apache.giraph.ooc.command; http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java new file mode 100644 index 0000000..7265410 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedDataStore.java @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.ooc.data; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.conf.IntConfOption; +import org.apache.giraph.ooc.OutOfCoreEngine; +import org.apache.giraph.ooc.persistence.DataIndex; +import org.apache.giraph.ooc.persistence.DataIndex.NumericIndexEntry; +import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor; +import org.apache.log4j.Logger; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; +import static org.apache.giraph.conf.GiraphConstants.ONE_MB; + +/** + * This class provides basic operations for data structures that have to + * participate in out-of-core mechanism. Essential subclasses of this class are: + * - DiskBackedPartitionStore (for partition data) + * - DiskBackedMessageStore (for messages) + * - DiskBackedEdgeStore (for edges read in INPUT_SUPERSTEP) + * Basically, any data structure that may cause OOM to happen can be implemented + * as a subclass of this class. + * + * There are two different terms used in the rest of this class: + * - "data store" refers to in-memory representation of data. Usually this is + * stored per-partition in in-memory implementations of data structures. For + * instance, "data store" of a DiskBackedPartitionStore would collection of + * all partitions kept in the in-memory partition store within the + * DiskBackedPartitionStore. + * - "raw data buffer" refers to raw data which were supposed to be + * de-serialized and added to the data store, but they remain 'as is' in the + * memory because their corresponding partition is offloaded to disk and is + * not available in the data store. + * + * @param <T> raw data format of the data store subclassing this class + */ +public abstract class DiskBackedDataStore<T> { + /** + * Minimum size of a buffer (in bytes) to flush to disk. This is used to + * decide whether vertex/edge buffers are large enough to flush to disk. + */ + public static final IntConfOption MINIMUM_BUFFER_SIZE_TO_FLUSH = + new IntConfOption("giraph.flushBufferSize", 8 * ONE_MB, + "Minimum size of a buffer (in bytes) to flush to disk."); + + /** Class logger. */ + private static final Logger LOG = Logger.getLogger( + DiskBackedDataStore.class); + /** Out-of-core engine */ + protected final OutOfCoreEngine oocEngine; + /** + * Set containing ids of all partitions where the partition data is in some + * file on disk. + * Note that the out-of-core mechanism may decide to put the data for a + * partition on disk, while the partition data is empty. For instance, at the + * beginning of a superstep, out-of-core mechanism may decide to put incoming + * messages of a partition on disk, while the partition has not received any + * messages. In such scenarios, the "out-of-core mechanism" thinks that the + * partition data is on disk, while disk-backed data stores may want to + * optimize for IO/metadata accesses and decide not to create/write anything + * on files on disk. + * In summary, there is a subtle difference between this field and + * `hasPartitionOnDisk` field. Basically, this field is used for optimizing + * IO (mainly metadata) accesses by disk-backed stores, while + * `hasPartitionDataOnDisk` is the view that out-of-core mechanism has + * regarding partition storage statuses. Since out-of-core mechanism does not + * know about the actual data for a partition, these two fields have to be + * separate. + */ + protected final Set<Integer> hasPartitionDataOnFile = + Sets.newConcurrentHashSet(); + /** Cached value for MINIMUM_BUFFER_SIZE_TO_FLUSH */ + private final int minBufferSizeToOffload; + /** Set containing ids of all out-of-core partitions */ + private final Set<Integer> hasPartitionDataOnDisk = + Sets.newConcurrentHashSet(); + /** + * Map of partition ids to list of raw data buffers. The map will have entries + * only for partitions that their in-memory data structures are currently + * offloaded to disk. We keep the aggregate size of buffers for each partition + * as part of the values in the map to estimate how much memory we can free up + * if we offload data buffers of a particular partition to disk. + */ + private final ConcurrentMap<Integer, Pair<Integer, List<T>>> dataBuffers = + Maps.newConcurrentMap(); + /** + * Map of partition ids to number of raw data buffers offloaded to disk for + * each partition. The map will have entries only for partitions that their + * in-memory data structures are currently out of core. It is necessary to + * know the number of data buffers on disk for a particular partition when we + * are loading all these buffers back in memory. + */ + private final ConcurrentMap<Integer, Integer> numDataBuffersOnDisk = + Maps.newConcurrentMap(); + /** + * Lock to avoid overlapping of read and write on data associated with each + * partition. + * */ + private final ConcurrentMap<Integer, ReadWriteLock> locks = + Maps.newConcurrentMap(); + + /** + * Constructor. + * + * @param conf Configuration + * @param oocEngine Out-of-core engine + */ + DiskBackedDataStore(ImmutableClassesGiraphConfiguration conf, + OutOfCoreEngine oocEngine) { + this.minBufferSizeToOffload = MINIMUM_BUFFER_SIZE_TO_FLUSH.get(conf); + this.oocEngine = oocEngine; + } + + /** + * Retrieves a lock for a given partition. If the lock for the given partition + * does not exist, creates a new lock. + * + * @param partitionId id of the partition the lock is needed for + * @return lock for a given partition + */ + private ReadWriteLock getPartitionLock(int partitionId) { + ReadWriteLock readWriteLock = locks.get(partitionId); + if (readWriteLock == null) { + readWriteLock = new ReentrantReadWriteLock(); + ReadWriteLock temp = locks.putIfAbsent(partitionId, readWriteLock); + if (temp != null) { + readWriteLock = temp; + } + } + return readWriteLock; + } + + /** + * Adds a data entry for a given partition to the current data store. If data + * of a given partition in data store is already offloaded to disk, adds the + * data entry to appropriate raw data buffer list. + * + * @param partitionId id of the partition to add the data entry to + * @param entry data entry to add + */ + protected void addEntry(int partitionId, T entry) { + // Addition of data entries to a data store is much more common than + // out-of-core operations. Besides, in-memory data store implementations + // existing in the code base already account for parallel addition to data + // stores. Therefore, using read lock would optimize for parallel addition + // to data stores, specially for cases where the addition should happen for + // partitions that are entirely in memory. + ReadWriteLock rwLock = getPartitionLock(partitionId); + rwLock.readLock().lock(); + if (hasPartitionDataOnDisk.contains(partitionId)) { + List<T> entryList = new ArrayList<>(); + entryList.add(entry); + int entrySize = entrySerializedSize(entry); + MutablePair<Integer, List<T>> newPair = + new MutablePair<>(entrySize, entryList); + Pair<Integer, List<T>> oldPair = + dataBuffers.putIfAbsent(partitionId, newPair); + if (oldPair != null) { + synchronized (oldPair) { + newPair = (MutablePair<Integer, List<T>>) oldPair; + newPair.setLeft(oldPair.getLeft() + entrySize); + newPair.getRight().add(entry); + } + } + } else { + addEntryToInMemoryPartitionData(partitionId, entry); + } + rwLock.readLock().unlock(); + } + + /** + * Loads and assembles all data for a given partition, and put it into the + * data store. Returns the number of bytes transferred from disk to memory in + * the loading process. + * + * @param partitionId id of the partition to load and assemble all data for + * @return number of bytes loaded from disk to memory + * @throws IOException + */ + public abstract long loadPartitionData(int partitionId) throws IOException; + + /** + * The proxy method that does the actual operation for `loadPartitionData`, + * but uses the data index given by the caller. + * + * @param partitionId id of the partition to load and assemble all data for + * @param index data index chain for the data to load + * @return number of bytes loaded from disk to memory + * @throws IOException + */ + protected long loadPartitionDataProxy(int partitionId, DataIndex index) + throws IOException { + long numBytes = 0; + ReadWriteLock rwLock = getPartitionLock(partitionId); + rwLock.writeLock().lock(); + if (hasPartitionDataOnDisk.contains(partitionId)) { + int ioThreadId = + oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId); + numBytes += loadInMemoryPartitionData(partitionId, ioThreadId, + index.addIndex(NumericIndexEntry.createPartitionEntry(partitionId))); + hasPartitionDataOnDisk.remove(partitionId); + // Loading raw data buffers from disk if there is any and applying those + // to already loaded in-memory data. + Integer numBuffers = numDataBuffersOnDisk.remove(partitionId); + if (numBuffers != null) { + checkState(numBuffers > 0); + index.addIndex(DataIndex.TypeIndexEntry.BUFFER); + OutOfCoreDataAccessor.DataInputWrapper inputWrapper = + oocEngine.getDataAccessor().prepareInput(ioThreadId, index.copy()); + for (int i = 0; i < numBuffers; ++i) { + T entry = readNextEntry(inputWrapper.getDataInput()); + addEntryToInMemoryPartitionData(partitionId, entry); + } + numBytes += inputWrapper.finalizeInput(true); + index.removeLastIndex(); + } + index.removeLastIndex(); + // Applying in-memory raw data buffers to in-memory partition data. + Pair<Integer, List<T>> pair = dataBuffers.remove(partitionId); + if (pair != null) { + for (T entry : pair.getValue()) { + addEntryToInMemoryPartitionData(partitionId, entry); + } + } + } + rwLock.writeLock().unlock(); + return numBytes; + } + + /** + * Offloads partition data of a given partition in the data store to disk, and + * returns the number of bytes offloaded from memory to disk. + * + * @param partitionId id of the partition to offload its data + * @return number of bytes offloaded from memory to disk + * @throws IOException + */ + public abstract long offloadPartitionData(int partitionId) throws IOException; + + /** + * The proxy method that does the actual operation for `offloadPartitionData`, + * but uses the data index given by the caller. + * + * @param partitionId id of the partition to offload its data + * @param index data index chain for the data to offload + * @return number of bytes offloaded from memory to disk + * @throws IOException + */ + @edu.umd.cs.findbugs.annotations.SuppressWarnings( + "UL_UNRELEASED_LOCK_EXCEPTION_PATH") + protected long offloadPartitionDataProxy( + int partitionId, DataIndex index) throws IOException { + ReadWriteLock rwLock = getPartitionLock(partitionId); + rwLock.writeLock().lock(); + hasPartitionDataOnDisk.add(partitionId); + rwLock.writeLock().unlock(); + int ioThreadId = + oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId); + long numBytes = offloadInMemoryPartitionData(partitionId, ioThreadId, + index.addIndex(NumericIndexEntry.createPartitionEntry(partitionId))); + index.removeLastIndex(); + return numBytes; + } + + /** + * Offloads raw data buffers of a given partition to disk, and returns the + * number of bytes offloaded from memory to disk. + * + * @param partitionId id of the partition to offload its raw data buffers + * @return number of bytes offloaded from memory to disk + * @throws IOException + */ + public abstract long offloadBuffers(int partitionId) throws IOException; + + /** + * The proxy method that does the actual operation for `offloadBuffers`, + * but uses the data index given by the caller. + * + * @param partitionId id of the partition to offload its raw data buffers + * @param index data index chain for the data to offload its buffers + * @return number of bytes offloaded from memory to disk + * @throws IOException + */ + protected long offloadBuffersProxy(int partitionId, DataIndex index) + throws IOException { + Pair<Integer, List<T>> pair = dataBuffers.get(partitionId); + if (pair == null || pair.getLeft() < minBufferSizeToOffload) { + return 0; + } + ReadWriteLock rwLock = getPartitionLock(partitionId); + rwLock.writeLock().lock(); + pair = dataBuffers.remove(partitionId); + rwLock.writeLock().unlock(); + checkNotNull(pair); + checkState(!pair.getRight().isEmpty()); + int ioThreadId = + oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId); + index.addIndex(NumericIndexEntry.createPartitionEntry(partitionId)) + .addIndex(DataIndex.TypeIndexEntry.BUFFER); + OutOfCoreDataAccessor.DataOutputWrapper outputWrapper = + oocEngine.getDataAccessor().prepareOutput(ioThreadId, index.copy(), + true); + for (T entry : pair.getRight()) { + writeEntry(entry, outputWrapper.getDataOutput()); + } + long numBytes = outputWrapper.finalizeOutput(); + index.removeLastIndex().removeLastIndex(); + int numBuffers = pair.getRight().size(); + Integer oldNumBuffersOnDisk = + numDataBuffersOnDisk.putIfAbsent(partitionId, numBuffers); + if (oldNumBuffersOnDisk != null) { + numDataBuffersOnDisk.replace(partitionId, + oldNumBuffersOnDisk + numBuffers); + } + return numBytes; + } + + /** + * Looks through all partitions that their data is not in the data store (is + * offloaded to disk), and sees if any of them has enough raw data buffer in + * memory. If so, puts that partition in a list to return. + * + * @return Set of partition ids of all partition raw buffers where the + * aggregate size of buffers are large enough and it is worth flushing + * those buffers to disk + */ + public Set<Integer> getCandidateBuffersToOffload() { + Set<Integer> result = new HashSet<>(); + for (Map.Entry<Integer, Pair<Integer, List<T>>> entry : + dataBuffers.entrySet()) { + if (entry.getValue().getLeft() > minBufferSizeToOffload) { + result.add(entry.getKey()); + } + } + return result; + } + + /** + * Writes a single raw entry to a given output stream. + * + * @param entry entry to write to output + * @param out output stream to write the entry to + * @throws IOException + */ + protected abstract void writeEntry(T entry, DataOutput out) + throws IOException; + + /** + * Reads the next available raw entry from a given input stream. + * + * @param in input stream to read the entry from + * @return entry read from an input stream + * @throws IOException + */ + protected abstract T readNextEntry(DataInput in) throws IOException; + + /** + * Loads data of a partition into data store. Returns number of bytes loaded. + * + * @param partitionId id of the partition to load its data + * @param ioThreadId id of the IO thread performing the load + * @param index data index chain for the data to load + * @return number of bytes loaded from disk to memory + * @throws IOException + */ + protected abstract long loadInMemoryPartitionData( + int partitionId, int ioThreadId, DataIndex index) throws IOException; + + /** + * Offloads data of a partition in data store to disk. Returns the number of + * bytes offloaded to disk + * + * @param partitionId id of the partition to offload to disk + * @param ioThreadId id of the IO thread performing the offload + * @param index data index chain for the data to offload + * @return number of bytes offloaded from memory to disk + * @throws IOException + */ + protected abstract long offloadInMemoryPartitionData( + int partitionId, int ioThreadId, DataIndex index) throws IOException; + + /** + * Gets the size of a given entry in bytes. + * + * @param entry input entry to find its size + * @return size of given input entry in bytes + */ + protected abstract int entrySerializedSize(T entry); + + /** + * Adds a single entry for a given partition to the in-memory data store. + * + * @param partitionId id of the partition to add the data to + * @param entry input entry to add to the data store + */ + protected abstract void addEntryToInMemoryPartitionData(int partitionId, + T entry); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedEdgeStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedEdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedEdgeStore.java index 53de52f..e727fbd 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedEdgeStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedEdgeStore.java @@ -21,25 +21,18 @@ package org.apache.giraph.ooc.data; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.edge.EdgeStore; import org.apache.giraph.ooc.OutOfCoreEngine; +import org.apache.giraph.ooc.persistence.DataIndex; +import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor; import org.apache.giraph.utils.ByteArrayVertexIdEdges; import org.apache.giraph.utils.VertexIdEdges; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.log4j.Logger; -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; import java.io.DataInput; -import java.io.DataInputStream; import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; -import static com.google.common.base.Preconditions.checkState; - /** * Implementation of an edge-store used for out-of-core mechanism. * @@ -49,7 +42,7 @@ import static com.google.common.base.Preconditions.checkState; */ public class DiskBackedEdgeStore<I extends WritableComparable, V extends Writable, E extends Writable> - extends OutOfCoreDataManager<VertexIdEdges<I, E>> + extends DiskBackedDataStore<VertexIdEdges<I, E>> implements EdgeStore<I, V, E> { /** Class logger. */ private static final Logger LOG = Logger.getLogger(DiskBackedEdgeStore.class); @@ -57,8 +50,6 @@ public class DiskBackedEdgeStore<I extends WritableComparable, private final EdgeStore<I, V, E> edgeStore; /** Configuration */ private final ImmutableClassesGiraphConfiguration<I, V, E> conf; - /** Out-of-core engine */ - private final OutOfCoreEngine oocEngine; /** * Constructor @@ -72,10 +63,9 @@ public class DiskBackedEdgeStore<I extends WritableComparable, EdgeStore<I, V, E> edgeStore, ImmutableClassesGiraphConfiguration<I, V, E> conf, OutOfCoreEngine oocEngine) { - super(conf); + super(conf, oocEngine); this.edgeStore = edgeStore; this.conf = conf; - this.oocEngine = oocEngine; } @Override @@ -114,32 +104,25 @@ public class DiskBackedEdgeStore<I extends WritableComparable, "should not be called for DiskBackedEdgeStore!"); } - /** - * Gets the path that should be used specifically for edge data. - * - * @param basePath path prefix to build the actual path from - * @return path to files specific for edge data - */ - private static String getPath(String basePath) { - return basePath + "_edge_store"; - } - @Override - public long loadPartitionData(int partitionId, String basePath) + public long loadPartitionData(int partitionId) throws IOException { - return super.loadPartitionData(partitionId, getPath(basePath)); + return loadPartitionDataProxy(partitionId, + new DataIndex().addIndex(DataIndex.TypeIndexEntry.EDGE_STORE)); } @Override - public long offloadPartitionData(int partitionId, String basePath) + public long offloadPartitionData(int partitionId) throws IOException { - return super.offloadPartitionData(partitionId, getPath(basePath)); + return offloadPartitionDataProxy(partitionId, + new DataIndex().addIndex(DataIndex.TypeIndexEntry.EDGE_STORE)); } @Override - public long offloadBuffers(int partitionId, String basePath) + public long offloadBuffers(int partitionId) throws IOException { - return super.offloadBuffers(partitionId, getPath(basePath)); + return offloadBuffersProxy(partitionId, + new DataIndex().addIndex(DataIndex.TypeIndexEntry.EDGE_STORE)); } @Override @@ -157,44 +140,31 @@ public class DiskBackedEdgeStore<I extends WritableComparable, } @Override - protected long loadInMemoryPartitionData(int partitionId, String path) - throws IOException { + protected long loadInMemoryPartitionData( + int partitionId, int ioThreadId, DataIndex index) throws IOException { long numBytes = 0; - File file = new File(path); - if (file.exists()) { - if (LOG.isDebugEnabled()) { - LOG.debug("loadInMemoryPartitionData: loading edge data for " + - "partition " + partitionId + " from " + file.getAbsolutePath()); - } - FileInputStream fis = new FileInputStream(file); - BufferedInputStream bis = new BufferedInputStream(fis); - DataInputStream dis = new DataInputStream(bis); - edgeStore.readPartitionEdgeStore(partitionId, dis); - dis.close(); - numBytes = file.length(); - checkState(file.delete(), "loadInMemoryPartitionData: failed to delete " + - "%s.", file.getAbsoluteFile()); + if (hasPartitionDataOnFile.remove(partitionId)) { + OutOfCoreDataAccessor.DataInputWrapper inputWrapper = + oocEngine.getDataAccessor().prepareInput(ioThreadId, index.copy()); + edgeStore.readPartitionEdgeStore(partitionId, + inputWrapper.getDataInput()); + numBytes = inputWrapper.finalizeInput(true); } return numBytes; } @Override - protected long offloadInMemoryPartitionData(int partitionId, String path) - throws IOException { + protected long offloadInMemoryPartitionData( + int partitionId, int ioThreadId, DataIndex index) throws IOException { long numBytes = 0; if (edgeStore.hasEdgesForPartition(partitionId)) { - File file = new File(path); - checkState(!file.exists(), "offloadInMemoryPartitionData: edge store " + - "file %s already exist", file.getAbsoluteFile()); - checkState(file.createNewFile(), - "offloadInMemoryPartitionData: cannot create edge store file %s", - file.getAbsoluteFile()); - FileOutputStream fos = new FileOutputStream(file); - BufferedOutputStream bos = new BufferedOutputStream(fos); - DataOutputStream dos = new DataOutputStream(bos); - edgeStore.writePartitionEdgeStore(partitionId, dos); - dos.close(); - numBytes = dos.size(); + OutOfCoreDataAccessor.DataOutputWrapper outputWrapper = + oocEngine.getDataAccessor().prepareOutput(ioThreadId, index.copy(), + false); + edgeStore.writePartitionEdgeStore(partitionId, + outputWrapper.getDataOutput()); + numBytes = outputWrapper.finalizeOutput(); + hasPartitionDataOnFile.add(partitionId); } return numBytes; } @@ -205,7 +175,7 @@ public class DiskBackedEdgeStore<I extends WritableComparable, } @Override - protected void addEntryToImMemoryPartitionData(int partitionId, + protected void addEntryToInMemoryPartitionData(int partitionId, VertexIdEdges<I, E> edges) { oocEngine.getMetaPartitionManager().addPartition(partitionId); edgeStore.addPartitionEdges(partitionId, edges); http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java index 94ba83a..c8d0f79 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java @@ -21,6 +21,10 @@ package org.apache.giraph.ooc.data; import org.apache.giraph.comm.messages.MessageStore; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.factories.MessageValueFactory; +import org.apache.giraph.ooc.OutOfCoreEngine; +import org.apache.giraph.ooc.persistence.DataIndex; +import org.apache.giraph.ooc.persistence.DataIndex.NumericIndexEntry; +import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor; import org.apache.giraph.utils.ByteArrayOneMessageToManyIds; import org.apache.giraph.utils.ByteArrayVertexIdMessages; import org.apache.giraph.utils.VertexIdMessages; @@ -28,19 +32,10 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.log4j.Logger; -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; import java.io.DataInput; -import java.io.DataInputStream; import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; -import static com.google.common.base.Preconditions.checkState; - /** * Implementation of a message store used for out-of-core mechanism. * @@ -48,7 +43,7 @@ import static com.google.common.base.Preconditions.checkState; * @param <M> Message data */ public class DiskBackedMessageStore<I extends WritableComparable, - M extends Writable> extends OutOfCoreDataManager<VertexIdMessages<I, M>> + M extends Writable> extends DiskBackedDataStore<VertexIdMessages<I, M>> implements MessageStore<I, M> { /** Class logger. */ private static final Logger LOG = @@ -82,6 +77,7 @@ public class DiskBackedMessageStore<I extends WritableComparable, * Constructor * * @param config Configuration + * @param oocEngine Out-of-core engine * @param messageStore In-memory message store for which out-of-core message * store would be wrapper * @param useMessageCombiner Whether message combiner is used for this message @@ -90,9 +86,10 @@ public class DiskBackedMessageStore<I extends WritableComparable, */ public DiskBackedMessageStore(ImmutableClassesGiraphConfiguration<I, ?, ?> config, + OutOfCoreEngine oocEngine, MessageStore<I, M> messageStore, boolean useMessageCombiner, long superstep) { - super(config); + super(config, oocEngine); this.config = config; this.messageStore = messageStore; this.useMessageCombiner = useMessageCombiner; @@ -140,43 +137,38 @@ public class DiskBackedMessageStore<I extends WritableComparable, } } - /** - * Gets the path that should be used specifically for message data. - * - * @param basePath path prefix to build the actual path from - * @param superstep superstep for which message data should be stored - * @return path to files specific for message data - */ - private static String getPath(String basePath, long superstep) { - return basePath + "_messages-S" + superstep; - } @Override - public long loadPartitionData(int partitionId, String basePath) + public long loadPartitionData(int partitionId) throws IOException { if (!useMessageCombiner) { - return super.loadPartitionData(partitionId, getPath(basePath, superstep)); + return loadPartitionDataProxy(partitionId, + new DataIndex().addIndex(DataIndex.TypeIndexEntry.MESSAGE) + .addIndex(NumericIndexEntry.createSuperstepEntry(superstep))); } else { return 0; } } @Override - public long offloadPartitionData(int partitionId, String basePath) + public long offloadPartitionData(int partitionId) throws IOException { if (!useMessageCombiner) { - return - super.offloadPartitionData(partitionId, getPath(basePath, superstep)); + return offloadPartitionDataProxy(partitionId, + new DataIndex().addIndex(DataIndex.TypeIndexEntry.MESSAGE) + .addIndex(NumericIndexEntry.createSuperstepEntry(superstep))); } else { return 0; } } @Override - public long offloadBuffers(int partitionId, String basePath) + public long offloadBuffers(int partitionId) throws IOException { if (!useMessageCombiner) { - return super.offloadBuffers(partitionId, getPath(basePath, superstep)); + return offloadBuffersProxy(partitionId, + new DataIndex().addIndex(DataIndex.TypeIndexEntry.MESSAGE) + .addIndex(NumericIndexEntry.createSuperstepEntry(superstep))); } else { return 0; } @@ -250,45 +242,31 @@ public class DiskBackedMessageStore<I extends WritableComparable, } @Override - protected long loadInMemoryPartitionData(int partitionId, String basePath) - throws IOException { + protected long loadInMemoryPartitionData(int partitionId, int ioThreadId, + DataIndex index) throws IOException { long numBytes = 0; - File file = new File(basePath); - if (file.exists()) { - if (LOG.isDebugEnabled()) { - LOG.debug("loadInMemoryPartitionData: loading message data for " + - "partition " + partitionId + " from " + file.getAbsolutePath()); - } - FileInputStream fis = new FileInputStream(file); - BufferedInputStream bis = new BufferedInputStream(fis); - DataInputStream dis = new DataInputStream(bis); - messageStore.readFieldsForPartition(dis, partitionId); - dis.close(); - numBytes = file.length(); - checkState(file.delete(), "loadInMemoryPartitionData: failed to delete " + - "%s.", file.getAbsoluteFile()); + if (hasPartitionDataOnFile.remove(partitionId)) { + OutOfCoreDataAccessor.DataInputWrapper inputWrapper = + oocEngine.getDataAccessor().prepareInput(ioThreadId, index.copy()); + messageStore.readFieldsForPartition(inputWrapper.getDataInput(), + partitionId); + numBytes = inputWrapper.finalizeInput(true); } return numBytes; } @Override - protected long offloadInMemoryPartitionData(int partitionId, String basePath) - throws IOException { + protected long offloadInMemoryPartitionData( + int partitionId, int ioThreadId, DataIndex index) throws IOException { long numBytes = 0; if (messageStore.hasMessagesForPartition(partitionId)) { - File file = new File(basePath); - checkState(!file.exists(), "offloadInMemoryPartitionData: message store" + - " file %s already exist", file.getAbsoluteFile()); - checkState(file.createNewFile(), - "offloadInMemoryPartitionData: cannot create message store file %s", - file.getAbsoluteFile()); - FileOutputStream fileout = new FileOutputStream(file); - BufferedOutputStream bufferout = new BufferedOutputStream(fileout); - DataOutputStream outputStream = new DataOutputStream(bufferout); - messageStore.writePartition(outputStream, partitionId); + OutOfCoreDataAccessor.DataOutputWrapper outputWrapper = + oocEngine.getDataAccessor().prepareOutput(ioThreadId, index.copy(), + false); + messageStore.writePartition(outputWrapper.getDataOutput(), partitionId); messageStore.clearPartition(partitionId); - outputStream.close(); - numBytes += outputStream.size(); + numBytes = outputWrapper.finalizeOutput(); + hasPartitionDataOnFile.add(partitionId); } return numBytes; } @@ -299,7 +277,7 @@ public class DiskBackedMessageStore<I extends WritableComparable, } @Override - protected void addEntryToImMemoryPartitionData(int partitionId, + protected void addEntryToInMemoryPartitionData(int partitionId, VertexIdMessages<I, M> messages) { messageStore.addPartitionMessages(partitionId, messages); http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java index 2a5e47a..6b7822f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java @@ -20,11 +20,12 @@ package org.apache.giraph.ooc.data; import com.google.common.collect.Maps; import org.apache.giraph.bsp.BspService; -import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.edge.OutEdges; import org.apache.giraph.graph.Vertex; import org.apache.giraph.ooc.OutOfCoreEngine; +import org.apache.giraph.ooc.persistence.DataIndex; +import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor; import org.apache.giraph.partition.Partition; import org.apache.giraph.partition.PartitionStore; import org.apache.giraph.utils.ExtendedDataOutput; @@ -35,25 +36,17 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; import org.apache.log4j.Logger; -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; import java.io.DataInput; -import java.io.DataInputStream; import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; import java.util.Map; import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; /** * Implementation of a partition-store used for out-of-core mechanism. * Partition store is responsible for partition data, as well as data buffers in - * INPUT_SUPERSTEP ("raw data buffer" -- defined in OutOfCoreDataManager -- + * INPUT_SUPERSTEP ("raw data buffer" -- defined in DiskBackedDataStore -- * refers to vertex buffers in INPUT_SUPERSTEP). * * @param <I> Vertex id @@ -62,7 +55,7 @@ import static com.google.common.base.Preconditions.checkState; */ public class DiskBackedPartitionStore<I extends WritableComparable, V extends Writable, E extends Writable> - extends OutOfCoreDataManager<ExtendedDataOutput> + extends DiskBackedDataStore<ExtendedDataOutput> implements PartitionStore<I, V, E> { /** Class logger. */ private static final Logger LOG = @@ -71,10 +64,6 @@ public class DiskBackedPartitionStore<I extends WritableComparable, private final ImmutableClassesGiraphConfiguration<I, V, E> conf; /** Job context (for progress) */ private final Mapper<?, ?, ?, ?>.Context context; - /** Service worker */ - private final CentralizedServiceWorker<I, V, E> serviceWorker; - /** Out-of-core engine */ - private final OutOfCoreEngine oocEngine; /** In-memory partition store */ private final PartitionStore<I, V, E> partitionStore; /** @@ -99,21 +88,17 @@ public class DiskBackedPartitionStore<I extends WritableComparable, * partition store would be a wrapper * @param conf Configuration * @param context Job context - * @param serviceWorker Service worker * @param oocEngine Out-of-core engine */ public DiskBackedPartitionStore( PartitionStore<I, V, E> partitionStore, ImmutableClassesGiraphConfiguration<I, V, E> conf, Mapper<?, ?, ?, ?>.Context context, - CentralizedServiceWorker<I, V, E> serviceWorker, OutOfCoreEngine oocEngine) { - super(conf); + super(conf, oocEngine); this.partitionStore = partitionStore; this.conf = conf; this.context = context; - this.serviceWorker = serviceWorker; - this.oocEngine = oocEngine; } @Override @@ -222,36 +207,6 @@ public class DiskBackedPartitionStore<I extends WritableComparable, } /** - * Gets the path that should be used specifically for partition data. - * - * @param basePath path prefix to build the actual path from - * @return path to files specific for partition data - */ - private static String getPath(String basePath) { - return basePath + "_partition"; - } - - /** - * Get the path to the file where vertices are stored. - * - * @param basePath path prefix to build the actual path from - * @return The path to the vertices file - */ - private static String getVerticesPath(String basePath) { - return basePath + "_vertices"; - } - - /** - * Get the path to the file where edges are stored. - * - * @param basePath path prefix to build the actual path from - * @return The path to the edges file - */ - private static String getEdgesPath(String basePath) { - return basePath + "_edges"; - } - - /** * Read vertex data from an input and initialize the vertex. * * @param in The input stream @@ -295,54 +250,42 @@ public class DiskBackedPartitionStore<I extends WritableComparable, } @Override - protected long loadInMemoryPartitionData(int partitionId, String path) - throws IOException { + protected long loadInMemoryPartitionData(int partitionId, int ioThreadId, + DataIndex index) throws IOException { long numBytes = 0; // Load vertices - File file = new File(getVerticesPath(path)); - if (file.exists()) { + if (hasPartitionDataOnFile.remove(partitionId)) { Partition<I, V, E> partition = conf.createPartition(partitionId, context); - if (LOG.isDebugEnabled()) { - LOG.debug("loadInMemoryPartitionData: loading partition vertices " + - partitionId + " from " + file.getAbsolutePath()); - } - - FileInputStream fis = new FileInputStream(file); - BufferedInputStream bis = new BufferedInputStream(fis); - DataInputStream inputStream = new DataInputStream(bis); - long numVertices = inputStream.readLong(); + OutOfCoreDataAccessor dataAccessor = oocEngine.getDataAccessor(); + index.addIndex(DataIndex.TypeIndexEntry.PARTITION_VERTICES); + OutOfCoreDataAccessor.DataInputWrapper inputWrapper = + dataAccessor.prepareInput(ioThreadId, index.copy()); + DataInput dataInput = inputWrapper.getDataInput(); + long numVertices = dataInput.readLong(); for (long i = 0; i < numVertices; ++i) { Vertex<I, V, E> vertex = conf.createVertex(); - readVertexData(inputStream, vertex); + readVertexData(dataInput, vertex); partition.putVertex(vertex); } - inputStream.close(); - numBytes += file.length(); - checkState(file.delete(), "loadInMemoryPartitionData: failed to delete " + - "%s", file.getAbsolutePath()); + numBytes += inputWrapper.finalizeInput(true); // Load edges - file = new File(getEdgesPath(path)); - if (LOG.isDebugEnabled()) { - LOG.debug("loadInMemoryPartitionData: loading partition edges " + - partitionId + " from " + file.getAbsolutePath()); - } - - fis = new FileInputStream(file); - bis = new BufferedInputStream(fis); - inputStream = new DataInputStream(bis); + index.removeLastIndex() + .addIndex(DataIndex.TypeIndexEntry.PARTITION_EDGES); + inputWrapper = dataAccessor.prepareInput(ioThreadId, index.copy()); + dataInput = inputWrapper.getDataInput(); for (int i = 0; i < numVertices; ++i) { - readOutEdges(inputStream, partition); + readOutEdges(dataInput, partition); } - inputStream.close(); - numBytes += file.length(); // If the graph is static and it is not INPUT_SUPERSTEP, keep the file // around. + boolean shouldDeleteEdges = false; if (!conf.isStaticGraph() || oocEngine.getSuperstep() == BspService.INPUT_SUPERSTEP) { - checkState(file.delete(), "loadPartition: failed to delete %s", - file.getAbsolutePath()); + shouldDeleteEdges = true; } + numBytes += inputWrapper.finalizeInput(shouldDeleteEdges); + index.removeLastIndex(); partitionStore.addPartition(partition); } return numBytes; @@ -354,7 +297,7 @@ public class DiskBackedPartitionStore<I extends WritableComparable, } @Override - protected void addEntryToImMemoryPartitionData(int partitionId, + protected void addEntryToInMemoryPartitionData(int partitionId, ExtendedDataOutput vertices) { if (!partitionStore.hasPartition(partitionId)) { oocEngine.getMetaPartitionManager().addPartition(partitionId); @@ -363,15 +306,17 @@ public class DiskBackedPartitionStore<I extends WritableComparable, } @Override - public long loadPartitionData(int partitionId, String basePath) + public long loadPartitionData(int partitionId) throws IOException { - return super.loadPartitionData(partitionId, getPath(basePath)); + return loadPartitionDataProxy(partitionId, + new DataIndex().addIndex(DataIndex.TypeIndexEntry.PARTITION)); } @Override - public long offloadPartitionData(int partitionId, String basePath) + public long offloadPartitionData(int partitionId) throws IOException { - return super.offloadPartitionData(partitionId, getPath(basePath)); + return offloadPartitionDataProxy(partitionId, + new DataIndex().addIndex(DataIndex.TypeIndexEntry.PARTITION)); } /** @@ -409,61 +354,44 @@ public class DiskBackedPartitionStore<I extends WritableComparable, } @Override - protected long offloadInMemoryPartitionData(int partitionId, String path) - throws IOException { + protected long offloadInMemoryPartitionData( + int partitionId, int ioThreadId, DataIndex index) throws IOException { long numBytes = 0; if (partitionStore.hasPartition(partitionId)) { + OutOfCoreDataAccessor dataAccessor = oocEngine.getDataAccessor(); partitionVertexCount.put(partitionId, partitionStore.getPartitionVertexCount(partitionId)); partitionEdgeCount.put(partitionId, partitionStore.getPartitionEdgeCount(partitionId)); Partition<I, V, E> partition = partitionStore.removePartition(partitionId); - File file = new File(getVerticesPath(path)); - if (LOG.isDebugEnabled()) { - LOG.debug("offloadInMemoryPartitionData: writing partition vertices " + - partitionId + " to " + file.getAbsolutePath()); - } - checkState(!file.exists(), "offloadInMemoryPartitionData: partition " + - "store file %s already exist", file.getAbsoluteFile()); - checkState(file.createNewFile(), - "offloadInMemoryPartitionData: file %s already exists.", - file.getAbsolutePath()); - - FileOutputStream fileout = new FileOutputStream(file); - BufferedOutputStream bufferout = new BufferedOutputStream(fileout); - DataOutputStream outputStream = new DataOutputStream(bufferout); - outputStream.writeLong(partition.getVertexCount()); + index.addIndex(DataIndex.TypeIndexEntry.PARTITION_VERTICES); + OutOfCoreDataAccessor.DataOutputWrapper outputWrapper = + dataAccessor.prepareOutput(ioThreadId, index.copy(), false); + DataOutput dataOutput = outputWrapper.getDataOutput(); + dataOutput.writeLong(partition.getVertexCount()); for (Vertex<I, V, E> vertex : partition) { - writeVertexData(outputStream, vertex); + writeVertexData(dataOutput, vertex); } - outputStream.close(); - numBytes += outputStream.size(); - + numBytes += outputWrapper.finalizeOutput(); + index.removeLastIndex(); // Avoid writing back edges if we have already written them once and // the graph is not changing. // If we are in the input superstep, we need to write the files // at least the first time, even though the graph is static. - file = new File(getEdgesPath(path)); + index.addIndex(DataIndex.TypeIndexEntry.PARTITION_EDGES); if (oocEngine.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP || - partitionVertexCount.get(partitionId) == null || - partitionVertexCount.get(partitionId) != partition.getVertexCount() || - !conf.isStaticGraph() || !file.exists()) { - checkState(file.createNewFile(), "offloadInMemoryPartitionData: file " + - "%s already exists.", file.getAbsolutePath()); - if (LOG.isDebugEnabled()) { - LOG.debug("offloadInMemoryPartitionData: writing partition edges " + - partitionId + " to " + file.getAbsolutePath()); - } - fileout = new FileOutputStream(file); - bufferout = new BufferedOutputStream(fileout); - outputStream = new DataOutputStream(bufferout); + !conf.isStaticGraph() || + !dataAccessor.dataExist(ioThreadId, index)) { + outputWrapper = dataAccessor.prepareOutput(ioThreadId, index.copy(), + false); for (Vertex<I, V, E> vertex : partition) { - writeOutEdges(outputStream, vertex); + writeOutEdges(outputWrapper.getDataOutput(), vertex); } - outputStream.close(); - numBytes += outputStream.size(); + numBytes += outputWrapper.finalizeOutput(); } + index.removeLastIndex(); + hasPartitionDataOnFile.add(partitionId); } return numBytes; } @@ -475,9 +403,10 @@ public class DiskBackedPartitionStore<I extends WritableComparable, } @Override - public long offloadBuffers(int partitionId, String basePath) + public long offloadBuffers(int partitionId) throws IOException { - return super.offloadBuffers(partitionId, getPath(basePath)); + return offloadBuffersProxy(partitionId, + new DataIndex().addIndex(DataIndex.TypeIndexEntry.PARTITION)); } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java index 1332a3a..64e3aed 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java @@ -99,6 +99,21 @@ public class MetaPartitionManager { */ private final AtomicDouble lowestGraphFractionInMemory = new AtomicDouble(1); + /** + * Map of partition ids to their indices. index of a partition is the order + * with which the partition has been inserted. Partitions are indexed as 0, 1, + * 2, etc. This indexing is later used to find the id of the IO thread who is + * responsible for handling a partition. Partitions are assigned to IO threads + * in a round-robin fashion based on their indices. + */ + private final ConcurrentMap<Integer, Integer> partitionIndex = + Maps.newConcurrentMap(); + /** + * Sequential counter used to assign indices to partitions as they are added + */ + private final AtomicInteger indexCounter = new AtomicInteger(0); + /** How many disks (i.e. IO threads) do we have? */ + private final int numIOThreads; /** * Constructor @@ -117,6 +132,7 @@ public class MetaPartitionManager { } this.oocEngine = oocEngine; this.randomGenerator = new Random(); + this.numIOThreads = numIOThreads; } /** @@ -131,7 +147,7 @@ public class MetaPartitionManager { /** * Get total number of partitions * - * @return total number of partition + * @return total number of partitions */ public int getNumPartitions() { return partitions.size(); @@ -175,6 +191,18 @@ public class MetaPartitionManager { } /** + * Get the thread id that is responsible for a particular partition + * + * @param partitionId id of the given partition + * @return id of the thread responsible for the given partition + */ + public int getOwnerThreadId(int partitionId) { + Integer index = partitionIndex.get(partitionId); + checkState(index != null); + return index % numIOThreads; + } + + /** * Add a partition * * @param partitionId id of a partition to add @@ -184,8 +212,9 @@ public class MetaPartitionManager { MetaPartition temp = partitions.putIfAbsent(partitionId, meta); // Check if the given partition is new if (temp == null) { - int ownerThread = oocEngine.getIOScheduler() - .getOwnerThreadId(partitionId); + int index = indexCounter.getAndIncrement(); + checkState(partitionIndex.putIfAbsent(partitionId, index) == null); + int ownerThread = getOwnerThreadId(partitionId); perThreadPartitionDictionary.get(ownerThread).addPartition(meta); numInMemoryPartitions.getAndIncrement(); } @@ -199,7 +228,7 @@ public class MetaPartitionManager { */ public void removePartition(Integer partitionId) { MetaPartition meta = partitions.remove(partitionId); - int ownerThread = oocEngine.getIOScheduler().getOwnerThreadId(partitionId); + int ownerThread = getOwnerThreadId(partitionId); perThreadPartitionDictionary.get(ownerThread).removePartition(meta); checkState(!meta.isOnDisk()); numInMemoryPartitions.getAndDecrement(); @@ -424,7 +453,7 @@ public class MetaPartitionManager { */ public void markPartitionAsInProcess(int partitionId) { MetaPartition meta = partitions.get(partitionId); - int ownerThread = oocEngine.getIOScheduler().getOwnerThreadId(partitionId); + int ownerThread = getOwnerThreadId(partitionId); synchronized (meta) { perThreadPartitionDictionary.get(ownerThread).removePartition(meta); meta.setProcessingState(ProcessingState.IN_PROCESS); @@ -468,7 +497,7 @@ public class MetaPartitionManager { */ public void setPartitionIsProcessed(int partitionId) { MetaPartition meta = partitions.get(partitionId); - int ownerThread = oocEngine.getIOScheduler().getOwnerThreadId(partitionId); + int ownerThread = getOwnerThreadId(partitionId); synchronized (meta) { perThreadPartitionDictionary.get(ownerThread).removePartition(meta); meta.setProcessingState(ProcessingState.PROCESSED); @@ -508,7 +537,7 @@ public class MetaPartitionManager { public void doneLoadingPartition(int partitionId, long superstep) { MetaPartition meta = partitions.get(partitionId); numInMemoryPartitions.getAndIncrement(); - int owner = oocEngine.getIOScheduler().getOwnerThreadId(partitionId); + int owner = getOwnerThreadId(partitionId); synchronized (meta) { perThreadPartitionDictionary.get(owner).removePartition(meta); meta.setPartitionState(StorageState.IN_MEM); @@ -535,8 +564,7 @@ public class MetaPartitionManager { */ public boolean startOffloadingMessages(int partitionId) { MetaPartition meta = partitions.get(partitionId); - int ownerThread = - oocEngine.getIOScheduler().getOwnerThreadId(partitionId); + int ownerThread = getOwnerThreadId(partitionId); synchronized (meta) { if (meta.getIncomingMessagesState() == StorageState.IN_MEM) { perThreadPartitionDictionary.get(ownerThread).removePartition(meta); @@ -558,8 +586,7 @@ public class MetaPartitionManager { */ public void doneOffloadingMessages(int partitionId) { MetaPartition meta = partitions.get(partitionId); - int ownerThread = - oocEngine.getIOScheduler().getOwnerThreadId(partitionId); + int ownerThread = getOwnerThreadId(partitionId); synchronized (meta) { perThreadPartitionDictionary.get(ownerThread).removePartition(meta); meta.setIncomingMessagesState(StorageState.ON_DISK); @@ -598,7 +625,7 @@ public class MetaPartitionManager { */ public boolean startOffloadingPartition(int partitionId) { MetaPartition meta = partitions.get(partitionId); - int owner = oocEngine.getIOScheduler().getOwnerThreadId(partitionId); + int owner = getOwnerThreadId(partitionId); synchronized (meta) { if (meta.getProcessingState() != ProcessingState.IN_PROCESS && (meta.getPartitionState() == StorageState.IN_MEM || @@ -624,7 +651,7 @@ public class MetaPartitionManager { numInMemoryPartitions.getAndDecrement(); updateGraphFractionInMemory(); MetaPartition meta = partitions.get(partitionId); - int owner = oocEngine.getIOScheduler().getOwnerThreadId(partitionId); + int owner = getOwnerThreadId(partitionId); synchronized (meta) { perThreadPartitionDictionary.get(owner).removePartition(meta); meta.setPartitionState(StorageState.ON_DISK); @@ -639,8 +666,7 @@ public class MetaPartitionManager { */ public void resetPartitions() { for (MetaPartition meta : partitions.values()) { - int owner = - oocEngine.getIOScheduler().getOwnerThreadId(meta.getPartitionId()); + int owner = getOwnerThreadId(meta.getPartitionId()); perThreadPartitionDictionary.get(owner).removePartition(meta); meta.resetPartition(); perThreadPartitionDictionary.get(owner).addPartition(meta); @@ -659,8 +685,7 @@ public class MetaPartitionManager { */ public void resetMessages() { for (MetaPartition meta : partitions.values()) { - int owner = - oocEngine.getIOScheduler().getOwnerThreadId(meta.getPartitionId()); + int owner = getOwnerThreadId(meta.getPartitionId()); perThreadPartitionDictionary.get(owner).removePartition(meta); meta.resetMessages(); if (meta.getPartitionState() == StorageState.IN_MEM && http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/data/OutOfCoreDataManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/OutOfCoreDataManager.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/OutOfCoreDataManager.java deleted file mode 100644 index 325850c..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/OutOfCoreDataManager.java +++ /dev/null @@ -1,401 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.ooc.data; - -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import org.apache.commons.lang3.tuple.MutablePair; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.conf.IntConfOption; -import org.apache.log4j.Logger; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; -import static org.apache.giraph.conf.GiraphConstants.ONE_MB; - -/** - * This class provides basic operations for data structures that have to - * participate in out-of-core mechanism. Essential subclasses of this class are: - * - DiskBackedPartitionStore (for partition data) - * - DiskBackedMessageStore (for messages) - * - DiskBackedEdgeStore (for edges read in INPUT_SUPERSTEP) - * Basically, any data structure that may cause OOM to happen can be implemented - * as a subclass of this class. - * - * There are two different terms used in the rest of this class: - * - "data store" refers to in-memory representation of data. Usually this is - * stored per-partition in in-memory implementations of data structures. For - * instance, "data store" of a DiskBackedPartitionStore would collection of - * all partitions kept in the in-memory partition store within the - * DiskBackedPartitionStore. - * - "raw data buffer" refers to raw data which were supposed to be - * de-serialized and added to the data store, but they remain 'as is' in the - * memory because their corresponding partition is offloaded to disk and is - * not available in the data store. - * - * @param <T> raw data format of the data store subclassing this class - */ -public abstract class OutOfCoreDataManager<T> { - /** - * Minimum size of a buffer (in bytes) to flush to disk. This is used to - * decide whether vertex/edge buffers are large enough to flush to disk. - */ - public static final IntConfOption MINIMUM_BUFFER_SIZE_TO_FLUSH = - new IntConfOption("giraph.flushBufferSize", 8 * ONE_MB, - "Minimum size of a buffer (in bytes) to flush to disk."); - - /** Class logger. */ - private static final Logger LOG = Logger.getLogger( - OutOfCoreDataManager.class); - /** Cached value for MINIMUM_BUFFER_SIZE_TO_FLUSH */ - private final int minBufferSizeToOffload; - /** Set containing ids of all out-of-core partitions */ - private final Set<Integer> hasPartitionDataOnDisk = - Sets.newConcurrentHashSet(); - /** - * Map of partition ids to list of raw data buffers. The map will have entries - * only for partitions that their in-memory data structures are currently - * offloaded to disk. We keep the aggregate size of buffers for each partition - * as part of the values in the map to estimate how much memory we can free up - * if we offload data buffers of a particular partition to disk. - */ - private final ConcurrentMap<Integer, Pair<Integer, List<T>>> dataBuffers = - Maps.newConcurrentMap(); - /** - * Map of partition ids to number of raw data buffers offloaded to disk for - * each partition. The map will have entries only for partitions that their - * in-memory data structures are currently out of core. It is necessary to - * know the number of data buffers on disk for a particular partition when we - * are loading all these buffers back in memory. - */ - private final ConcurrentMap<Integer, Integer> numDataBuffersOnDisk = - Maps.newConcurrentMap(); - /** - * Lock to avoid overlapping of read and write on data associated with each - * partition. - * */ - private final ConcurrentMap<Integer, ReadWriteLock> locks = - Maps.newConcurrentMap(); - - /** - * Constructor. - * - * @param conf Configuration - */ - OutOfCoreDataManager(ImmutableClassesGiraphConfiguration conf) { - this.minBufferSizeToOffload = MINIMUM_BUFFER_SIZE_TO_FLUSH.get(conf); - } - - /** - * Retrieves a lock for a given partition. If the lock for the given partition - * does not exist, creates a new lock. - * - * @param partitionId id of the partition the lock is needed for - * @return lock for a given partition - */ - private ReadWriteLock getPartitionLock(int partitionId) { - ReadWriteLock readWriteLock = locks.get(partitionId); - if (readWriteLock == null) { - readWriteLock = new ReentrantReadWriteLock(); - ReadWriteLock temp = locks.putIfAbsent(partitionId, readWriteLock); - if (temp != null) { - readWriteLock = temp; - } - } - return readWriteLock; - } - - /** - * Adds a data entry for a given partition to the current data store. If data - * of a given partition in data store is already offloaded to disk, adds the - * data entry to appropriate raw data buffer list. - * - * @param partitionId id of the partition to add the data entry to - * @param entry data entry to add - */ - protected void addEntry(int partitionId, T entry) { - // Addition of data entries to a data store is much more common than - // out-of-core operations. Besides, in-memory data store implementations - // existing in the code base already account for parallel addition to data - // stores. Therefore, using read lock would optimize for parallel addition - // to data stores, specially for cases where the addition should happen for - // partitions that are entirely in memory. - ReadWriteLock rwLock = getPartitionLock(partitionId); - rwLock.readLock().lock(); - if (hasPartitionDataOnDisk.contains(partitionId)) { - List<T> entryList = new ArrayList<>(); - entryList.add(entry); - int entrySize = entrySerializedSize(entry); - MutablePair<Integer, List<T>> newPair = - new MutablePair<>(entrySize, entryList); - Pair<Integer, List<T>> oldPair = - dataBuffers.putIfAbsent(partitionId, newPair); - if (oldPair != null) { - synchronized (oldPair) { - newPair = (MutablePair<Integer, List<T>>) oldPair; - newPair.setLeft(oldPair.getLeft() + entrySize); - newPair.getRight().add(entry); - } - } - } else { - addEntryToImMemoryPartitionData(partitionId, entry); - } - rwLock.readLock().unlock(); - } - - /** - * Loads and assembles all data for a given partition, and put it into the - * data store. Returns the number of bytes transferred from disk to memory in - * the loading process. - * - * @param partitionId id of the partition to load ana assemble all data for - * @param basePath path to load the data from - * @return number of bytes loaded from disk to memory - * @throws IOException - */ - public long loadPartitionData(int partitionId, String basePath) - throws IOException { - long numBytes = 0; - ReadWriteLock rwLock = getPartitionLock(partitionId); - rwLock.writeLock().lock(); - if (hasPartitionDataOnDisk.contains(partitionId)) { - numBytes += loadInMemoryPartitionData(partitionId, - getPath(basePath, partitionId)); - hasPartitionDataOnDisk.remove(partitionId); - // Loading raw data buffers from disk if there is any and applying those - // to already loaded in-memory data. - Integer numBuffers = numDataBuffersOnDisk.remove(partitionId); - if (numBuffers != null) { - checkState(numBuffers > 0); - File file = new File(getBuffersPath(basePath, partitionId)); - checkState(file.exists()); - if (LOG.isDebugEnabled()) { - LOG.debug("loadPartitionData: loading " + numBuffers + " buffers of" + - " partition " + partitionId + " from " + file.getAbsolutePath()); - } - FileInputStream fis = new FileInputStream(file); - BufferedInputStream bis = new BufferedInputStream(fis); - DataInputStream dis = new DataInputStream(bis); - for (int i = 0; i < numBuffers; ++i) { - T entry = readNextEntry(dis); - addEntryToImMemoryPartitionData(partitionId, entry); - } - dis.close(); - numBytes += file.length(); - checkState(file.delete(), "loadPartitionData: failed to delete %s.", - file.getAbsoluteFile()); - } - // Applying in-memory raw data buffers to in-memory partition data. - Pair<Integer, List<T>> pair = dataBuffers.remove(partitionId); - if (pair != null) { - for (T entry : pair.getValue()) { - addEntryToImMemoryPartitionData(partitionId, entry); - } - } - } - rwLock.writeLock().unlock(); - return numBytes; - } - - /** - * Offloads partition data of a given partition in the data store to disk, and - * returns the number of bytes offloaded from memory to disk. - * - * @param partitionId id of the partition to offload its data - * @param basePath path to offload the data to - * @return number of bytes offloaded from memory to disk - * @throws IOException - */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings( - "UL_UNRELEASED_LOCK_EXCEPTION_PATH") - public long offloadPartitionData(int partitionId, String basePath) - throws IOException { - ReadWriteLock rwLock = getPartitionLock(partitionId); - rwLock.writeLock().lock(); - hasPartitionDataOnDisk.add(partitionId); - rwLock.writeLock().unlock(); - return offloadInMemoryPartitionData(partitionId, - getPath(basePath, partitionId)); - } - - /** - * Offloads raw data buffers of a given partition to disk, and returns the - * number of bytes offloaded from memory to disk. - * - * @param partitionId id of the partition to offload its raw data buffers - * @param basePath path to offload the data to - * @return number of bytes offloaded from memory to disk - * @throws IOException - */ - public long offloadBuffers(int partitionId, String basePath) - throws IOException { - Pair<Integer, List<T>> pair = dataBuffers.get(partitionId); - if (pair == null || pair.getLeft() < minBufferSizeToOffload) { - return 0; - } - ReadWriteLock rwLock = getPartitionLock(partitionId); - rwLock.writeLock().lock(); - pair = dataBuffers.remove(partitionId); - rwLock.writeLock().unlock(); - checkNotNull(pair); - checkState(!pair.getRight().isEmpty()); - File file = new File(getBuffersPath(basePath, partitionId)); - FileOutputStream fos = new FileOutputStream(file, true); - BufferedOutputStream bos = new BufferedOutputStream(fos); - DataOutputStream dos = new DataOutputStream(bos); - for (T entry : pair.getRight()) { - writeEntry(entry, dos); - } - dos.close(); - long numBytes = dos.size(); - int numBuffers = pair.getRight().size(); - Integer oldNumBuffersOnDisk = - numDataBuffersOnDisk.putIfAbsent(partitionId, numBuffers); - if (oldNumBuffersOnDisk != null) { - numDataBuffersOnDisk.replace(partitionId, - oldNumBuffersOnDisk + numBuffers); - } - return numBytes; - } - - /** - * Looks through all partitions that their data is not in the data store (is - * offloaded to disk), and sees if any of them has enough raw data buffer in - * memory. If so, puts that partition in a list to return. - * - * @return Set of partition ids of all partition raw buffers where the - * aggregate size of buffers are large enough and it is worth flushing - * those buffers to disk - */ - public Set<Integer> getCandidateBuffersToOffload() { - Set<Integer> result = new HashSet<>(); - for (Map.Entry<Integer, Pair<Integer, List<T>>> entry : - dataBuffers.entrySet()) { - if (entry.getValue().getLeft() > minBufferSizeToOffload) { - result.add(entry.getKey()); - } - } - return result; - } - - /** - * Creates the path to read/write partition data from/to for a given - * partition. - * - * @param basePath path prefix to create the actual path from - * @param partitionId id of the partition - * @return path to read/write data from/to - */ - private static String getPath(String basePath, int partitionId) { - return basePath + "-P" + partitionId; - } - - /** - * Creates the path to read/write raw data buffers of a given partition - * from/to. - * - * @param basePath path prefix to create the actual path from - * @param partitionId id of the partition - * @return path to read/write raw data buffer to/from - */ - private static String getBuffersPath(String basePath, int partitionId) { - return getPath(basePath, partitionId) + "_buffers"; - } - - /** - * Writes a single raw entry to a given output stream. - * - * @param entry entry to write to output - * @param out output stream to write the entry to - * @throws IOException - */ - protected abstract void writeEntry(T entry, DataOutput out) - throws IOException; - - /** - * Reads the next available raw entry from a given input stream. - * - * @param in input stream to read the entry from - * @return entry read from an input stream - * @throws IOException - */ - protected abstract T readNextEntry(DataInput in) throws IOException; - - /** - * Loads data of a partition into data store. Returns number of bytes loaded. - * - * @param partitionId id of the partition to load its data - * @param path path from which data should be loaded - * @return number of bytes loaded from disk to memory - * @throws IOException - */ - protected abstract long loadInMemoryPartitionData(int partitionId, - String path) - throws IOException; - - /** - * Offloads data of a partition in data store to disk. Returns the number of - * bytes offloaded to disk - * - * @param partitionId id of the partition to offload to disk - * @param path path to which data should be offloaded - * @return number of bytes offloaded from memory to disk - * @throws IOException - */ - protected abstract long offloadInMemoryPartitionData(int partitionId, - String path) - throws IOException; - - /** - * Gets the size of a given entry in bytes. - * - * @param entry input entry to find its size - * @return size of given input entry in bytes - */ - protected abstract int entrySerializedSize(T entry); - - /** - * Adds a single entry for a given partition to the in-memory data store. - * - * @param partitionId id of the partition to add the data to - * @param entry input entry to add to the data store - */ - protected abstract void addEntryToImMemoryPartitionData(int partitionId, - T entry); -} http://git-wip-us.apache.org/repos/asf/giraph/blob/3793c9ef/giraph-core/src/main/java/org/apache/giraph/ooc/io/IOCommand.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/io/IOCommand.java b/giraph-core/src/main/java/org/apache/giraph/ooc/io/IOCommand.java deleted file mode 100644 index e84ad29..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/io/IOCommand.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.ooc.io; - -import org.apache.giraph.ooc.OutOfCoreEngine; - -import java.io.IOException; - -/** - * Representation of an IO command (moving data to disk/memory) used in - * out-of-core mechanism. - */ -public abstract class IOCommand { - /** Type of IO command */ - public enum IOCommandType { - /** Loading a partition */ - LOAD_PARTITION, - /** Storing a partition */ - STORE_PARTITION, - /** Storing incoming messages of a partition */ - STORE_MESSAGE, - /** - * Storing message/buffer raw data buffer of a currently out-of-core - * partition - */ - STORE_BUFFER, - /** Doing nothing regarding IO */ - WAIT - } - - /** Id of the partition involved for the IO */ - protected final int partitionId; - /** Out-of-core engine */ - protected final OutOfCoreEngine oocEngine; - /** - * Number of bytes transferred to/from memory (loaded/stored) during the - * execution of the command - */ - protected long numBytesTransferred; - - /** - * Constructor - * - * @param oocEngine Out-of-core engine - * @param partitionId Id of the partition involved in the IO - */ - public IOCommand(OutOfCoreEngine oocEngine, int partitionId) { - this.oocEngine = oocEngine; - this.partitionId = partitionId; - this.numBytesTransferred = 0; - } - - /** - * Get the id of the partition involved in the IO - * - * @return id of the partition - */ - public int getPartitionId() { - return partitionId; - } - - /** - * Execute (load/store of data) the IO command, and change the data stores - * appropriately based on the data loaded/stored. Return true iff the command - * is actually executed (resulted in loading or storing data). - * - * @param basePath the base path (prefix) to the files/folders IO command - * should read/write data from/to - * @return whether the command is actually executed - * @throws IOException - */ - public abstract boolean execute(String basePath) throws IOException; - - /** - * Get the type of the command. - * - * @return type of the command - */ - public abstract IOCommandType getType(); - - /** - * Get the number of bytes transferred (loaded/stored from/to disk). - * - * @return number of bytes transferred during the execution of the command - */ - public long bytesTransferred() { - return numBytesTransferred; - } -} -
