Repository: giraph Updated Branches: refs/heads/trunk b51ecd27c -> b0262f8c8
[GIRAPH-1089] Fix a bug in out-of-core infrastructure Summary: This diff fixes a bug in out-of-core infrastructure that caused user requirement (max number of partitions in memory) for fixed out-of-core strategy get violated. The cause of the problems was the un-clear definition of in-memory partitions. In this diff, we distinguish the partitions that are entirely in memory from those that are partially in memory. Test Plan: mvn clean verify Reviewers: dionysis.logothetis, sergey.edunov, maja.kabiljo Reviewed By: maja.kabiljo Differential Revision: https://reviews.facebook.net/D60573 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/b0262f8c Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/b0262f8c Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/b0262f8c Branch: refs/heads/trunk Commit: b0262f8c81c352c0cf3ac11e1e98646aa9587944 Parents: b51ecd2 Author: Hassan Eslami <[email protected]> Authored: Tue Jul 12 11:33:38 2016 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Tue Jul 12 11:33:38 2016 -0700 ---------------------------------------------------------------------- .../org/apache/giraph/ooc/OutOfCoreEngine.java | 2 +- .../apache/giraph/ooc/OutOfCoreIOCallable.java | 4 +- .../apache/giraph/ooc/OutOfCoreIOScheduler.java | 4 +- .../giraph/ooc/data/MetaPartitionManager.java | 202 ++++++++++++++++--- .../ooc/persistence/LocalDiskDataAccessor.java | 4 - .../ooc/policy/FixedPartitionsOracle.java | 15 +- .../giraph/partition/TestPartitionStores.java | 18 +- .../java/org/apache/giraph/TestOutOfCore.java | 49 +++-- 8 files changed, 230 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/b0262f8c/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java index d5bfd4f..65399b2 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java @@ -491,7 +491,7 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver { superstepMetrics.getGauge(GRAPH_PERCENTAGE_IN_MEMORY, new Gauge<Double>() { @Override public Double value() { - return metaPartitionManager.getLowestGraphFractionInMemory() * 100; + return metaPartitionManager.getGraphFractionInMemory() * 100; } }); } http://git-wip-us.apache.org/repos/asf/giraph/blob/b0262f8c/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java index 829ad80..c21be95 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java @@ -76,7 +76,7 @@ public class OutOfCoreIOCallable implements Callable<Void>, while (true) { oocEngine.getSuperstepLock().readLock().lock(); IOCommand command = oocEngine.getIOScheduler().getNextIOCommand(diskId); - if (LOG.isInfoEnabled()) { + if (LOG.isInfoEnabled() && !(command instanceof WaitIOCommand)) { LOG.info("call: thread " + diskId + "'s next IO command is: " + command); } @@ -101,7 +101,7 @@ public class OutOfCoreIOCallable implements Callable<Void>, timeInGC = oocEngine.getServiceWorker().getGraphTaskManager() .getSuperstepGCTime() - timeInGC; bytes = command.bytesTransferred(); - if (LOG.isInfoEnabled()) { + if (LOG.isInfoEnabled() && !(command instanceof WaitIOCommand)) { LOG.info("call: thread " + diskId + "'s command " + command + " completed: bytes= " + bytes + ", duration=" + duration + ", " + "bandwidth=" + String.format("%.2f", (double) bytes / duration * http://git-wip-us.apache.org/repos/asf/giraph/blob/b0262f8c/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java index 906607d..3dc1019 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOScheduler.java @@ -102,8 +102,8 @@ public class OutOfCoreIOScheduler { } OutOfCoreOracle.IOAction[] actions = oocEngine.getOracle().getNextIOActions(); - if (LOG.isInfoEnabled()) { - LOG.info("getNextIOCommand: actions are " + Arrays.toString(actions)); + if (LOG.isDebugEnabled()) { + LOG.debug("getNextIOCommand: actions are " + Arrays.toString(actions)); } // Check whether there are any urgent outstanding load requests if (!threadLoadCommandQueue.get(threadId).isEmpty()) { http://git-wip-us.apache.org/repos/asf/giraph/blob/b0262f8c/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java index 3075829..173b451 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java @@ -54,13 +54,31 @@ public class MetaPartitionManager { /** Different storage states for data */ private enum StorageState { IN_MEM, ON_DISK, IN_TRANSIT }; /** + * Different storage states for a partition as a whole (i.e. the partition + * and its current messages) + */ + private enum PartitionStorageState + /** + * Either both partition and its current messages are in memory, or both + * are on disk, or one part is on disk and the other part is in memory. + */ + { FULLY_IN_MEM, PARTIALLY_IN_MEM, FULLY_ON_DISK }; + /** * Different processing states for partitions. Processing states are reset * at the beginning of each iteration cycle over partitions. */ private enum ProcessingState { PROCESSED, UNPROCESSED, IN_PROCESS }; - /** Number of in-memory partitions */ + /** + * Number of partitions in-memory (partition and current messages in memory) + */ private final AtomicInteger numInMemoryPartitions = new AtomicInteger(0); + /** + * Number of partitions that are partially in-memory (either partition or its + * current messages is in memory and the other part is not) + */ + private final AtomicInteger numPartiallyInMemoryPartitions = + new AtomicInteger(0); /** Map (dictionary) of partitions to their meta information */ private final ConcurrentMap<Integer, MetaPartition> partitions = Maps.newConcurrentMap(); @@ -136,8 +154,6 @@ public class MetaPartitionManager { } /** - * Get number of partitions in memory - * * @return number of partitions in memory */ public int getNumInMemoryPartitions() { @@ -145,6 +161,13 @@ public class MetaPartitionManager { } /** + * @return number of partitions that are partially in memory + */ + public int getNumPartiallyInMemoryPartitions() { + return numPartiallyInMemoryPartitions.get(); + } + + /** * Get total number of partitions * * @return total number of partitions @@ -153,8 +176,16 @@ public class MetaPartitionManager { return partitions.size(); } - public double getLowestGraphFractionInMemory() { - return lowestGraphFractionInMemory.get(); + /** + * Since the statistics are based on estimates, we assume each partial + * partition is taking about half of the full partition in terms of memory + * footprint. + * + * @return estimate of fraction of graph in memory + */ + public double getGraphFractionInMemory() { + return (getNumInMemoryPartitions() + + getNumPartiallyInMemoryPartitions() / 2.0) / getNumPartitions(); } /** @@ -162,8 +193,7 @@ public class MetaPartitionManager { * information in one of the counters. */ private synchronized void updateGraphFractionInMemory() { - double graphInMemory = - (double) getNumInMemoryPartitions() / getNumPartitions(); + double graphInMemory = getGraphFractionInMemory(); if (graphInMemory < lowestGraphFractionInMemory.get()) { lowestGraphFractionInMemory.set(graphInMemory); WorkerProgress.get().updateLowestGraphPercentageInMemory( @@ -172,6 +202,26 @@ public class MetaPartitionManager { } /** + * Update the book-keeping about number of in-memory partitions and partially + * in-memory partitions with regard to the storage status of the partition and + * its current messages before and after an update to its status. + * + * @param stateBefore the storage state of the partition and its current + * messages before an update + * @param stateAfter the storage state of the partition and its current + * messages after an update + */ + private void updateCounters(PartitionStorageState stateBefore, + PartitionStorageState stateAfter) { + numInMemoryPartitions.getAndAdd( + ((stateAfter == PartitionStorageState.FULLY_IN_MEM) ? 1 : 0) - + ((stateBefore == PartitionStorageState.FULLY_IN_MEM) ? 1 : 0)); + numPartiallyInMemoryPartitions.getAndAdd( + ((stateAfter == PartitionStorageState.PARTIALLY_IN_MEM) ? 1 : 0) - + ((stateBefore == PartitionStorageState.PARTIALLY_IN_MEM) ? 1 : 0)); + } + + /** * Whether a given partition is available * * @param partitionId id of the partition to check if this worker owns it @@ -266,49 +316,63 @@ public class MetaPartitionManager { } /** - * Get id of a partition to offload on disk + * Get id of a partition to offload to disk. Prioritize offloading processed + * partitions over unprocessed partition. Also, prioritize offloading + * partitions partially in memory over partitions fully in memory. * * @param threadId id of the thread who is going to store the partition on * disk * @return id of the partition to offload on disk */ public Integer getOffloadPartitionId(int threadId) { + // First, look for a processed partition partially on disk MetaPartition meta = perThreadPartitionDictionary.get(threadId).lookup( ProcessingState.PROCESSED, StorageState.IN_MEM, - StorageState.IN_MEM, + StorageState.ON_DISK, null); if (meta != null) { return meta.getPartitionId(); } meta = perThreadPartitionDictionary.get(threadId).lookup( ProcessingState.PROCESSED, + StorageState.ON_DISK, StorageState.IN_MEM, - null, null); if (meta != null) { return meta.getPartitionId(); } + // Second, look for a processed partition entirely in memory meta = perThreadPartitionDictionary.get(threadId).lookup( ProcessingState.PROCESSED, - null, + StorageState.IN_MEM, StorageState.IN_MEM, null); if (meta != null) { return meta.getPartitionId(); } + // Third, look for an unprocessed partition partially on disk meta = perThreadPartitionDictionary.get(threadId).lookup( ProcessingState.UNPROCESSED, StorageState.IN_MEM, - null, + StorageState.ON_DISK, null); if (meta != null) { return meta.getPartitionId(); } meta = perThreadPartitionDictionary.get(threadId).lookup( ProcessingState.UNPROCESSED, - null, + StorageState.ON_DISK, + StorageState.IN_MEM, + null); + if (meta != null) { + return meta.getPartitionId(); + } + // Forth, look for an unprocessed partition entirely in memory + meta = perThreadPartitionDictionary.get(threadId).lookup( + ProcessingState.UNPROCESSED, + StorageState.IN_MEM, StorageState.IN_MEM, null); if (meta != null) { @@ -371,7 +435,11 @@ public class MetaPartitionManager { } /** - * Get id of a partition to offload its incoming message on disk + * Get id of a partition to offload its incoming message on disk. Prioritize + * offloading messages of partitions already on disk, and then partitions + * in-transit, over partitions in-memory. Also, prioritize processed + * partitions over unprocessed (processed partitions would go on disk with + * more chances that unprocessed partitions) * * @param threadId id of the thread who is going to store the incoming * messages on disk @@ -389,7 +457,14 @@ public class MetaPartitionManager { if (meta != null) { return meta.getPartitionId(); } - + meta = perThreadPartitionDictionary.get(threadId).lookup( + ProcessingState.PROCESSED, + StorageState.IN_TRANSIT, + null, + StorageState.IN_MEM); + if (meta != null) { + return meta.getPartitionId(); + } meta = perThreadPartitionDictionary.get(threadId).lookup( ProcessingState.UNPROCESSED, StorageState.ON_DISK, @@ -398,16 +473,27 @@ public class MetaPartitionManager { if (meta != null) { return meta.getPartitionId(); } + meta = perThreadPartitionDictionary.get(threadId).lookup( + ProcessingState.UNPROCESSED, + StorageState.IN_TRANSIT, + null, + StorageState.IN_MEM); + if (meta != null) { + return meta.getPartitionId(); + } return null; } /** - * Get id of a partition to load its data to memory + * Get id of a partition to load its data to memory. Prioritize loading an + * unprocessed partition over loading processed partition. Also, prioritize + * loading a partition partially in memory over partitions entirely on disk. * * @param threadId id of the thread who is going to load the partition data * @return id of the partition to load its data to memory */ public Integer getLoadPartitionId(int threadId) { + // First, look for an unprocessed partition partially in memory MetaPartition meta = perThreadPartitionDictionary.get(threadId).lookup( ProcessingState.UNPROCESSED, StorageState.IN_MEM, @@ -420,29 +506,51 @@ public class MetaPartitionManager { meta = perThreadPartitionDictionary.get(threadId).lookup( ProcessingState.UNPROCESSED, StorageState.ON_DISK, - null, + StorageState.IN_MEM, null); if (meta != null) { return meta.getPartitionId(); } + // Second, look for an unprocessed partition entirely on disk + meta = perThreadPartitionDictionary.get(threadId).lookup( + ProcessingState.UNPROCESSED, + StorageState.ON_DISK, + StorageState.ON_DISK, + null); + if (meta != null) { + return meta.getPartitionId(); + } + + // Third, look for a processed partition partially in memory meta = perThreadPartitionDictionary.get(threadId).lookup( ProcessingState.PROCESSED, + StorageState.IN_MEM, StorageState.ON_DISK, - null, null); if (meta != null) { - meta.getPartitionId(); + return meta.getPartitionId(); } meta = perThreadPartitionDictionary.get(threadId).lookup( ProcessingState.PROCESSED, - null, StorageState.ON_DISK, + StorageState.IN_MEM, null); if (meta != null) { - meta.getPartitionId(); + return meta.getPartitionId(); } + + // Forth, look for a processed partition entirely on disk + meta = perThreadPartitionDictionary.get(threadId).lookup( + ProcessingState.PROCESSED, + StorageState.ON_DISK, + StorageState.ON_DISK, + null); + if (meta != null) { + return meta.getPartitionId(); + } + return null; } @@ -536,9 +644,9 @@ public class MetaPartitionManager { */ public void doneLoadingPartition(int partitionId, long superstep) { MetaPartition meta = partitions.get(partitionId); - numInMemoryPartitions.getAndIncrement(); int owner = getOwnerThreadId(partitionId); synchronized (meta) { + PartitionStorageState stateBefore = meta.getPartitionStorageState(); perThreadPartitionDictionary.get(owner).removePartition(meta); meta.setPartitionState(StorageState.IN_MEM); if (superstep == oocEngine.getSuperstep()) { @@ -546,6 +654,8 @@ public class MetaPartitionManager { } else { meta.setIncomingMessagesState(StorageState.IN_MEM); } + PartitionStorageState stateAfter = meta.getPartitionStorageState(); + updateCounters(stateBefore, stateAfter); // Check whether load was to prefetch a partition from disk to memory for // the next superstep if (meta.getProcessingState() == ProcessingState.PROCESSED) { @@ -553,6 +663,7 @@ public class MetaPartitionManager { } perThreadPartitionDictionary.get(owner).addPartition(meta); } + updateGraphFractionInMemory(); } /** @@ -631,8 +742,16 @@ public class MetaPartitionManager { (meta.getPartitionState() == StorageState.IN_MEM || meta.getCurrentMessagesState() == StorageState.IN_MEM)) { perThreadPartitionDictionary.get(owner).removePartition(meta); - meta.setPartitionState(StorageState.IN_TRANSIT); - meta.setCurrentMessagesState(StorageState.IN_TRANSIT); + // We may only need to offload either partition or current messages of + // that partition to disk. So, if either of the components (partition + // or its current messages) is already on disk, we should not update its + // metadata. + if (meta.getPartitionState() != StorageState.ON_DISK) { + meta.setPartitionState(StorageState.IN_TRANSIT); + } + if (meta.getCurrentMessagesState() != StorageState.ON_DISK) { + meta.setCurrentMessagesState(StorageState.IN_TRANSIT); + } perThreadPartitionDictionary.get(owner).addPartition(meta); return true; } else { @@ -648,16 +767,23 @@ public class MetaPartitionManager { * @param partitionId id of the partition that its data is offloaded */ public void doneOffloadingPartition(int partitionId) { - numInMemoryPartitions.getAndDecrement(); - updateGraphFractionInMemory(); MetaPartition meta = partitions.get(partitionId); int owner = getOwnerThreadId(partitionId); synchronized (meta) { + // We either offload both partition and its messages to disk, or we only + // offload one of the components. + if (meta.getCurrentMessagesState() == StorageState.IN_TRANSIT && + meta.getPartitionState() == StorageState.IN_TRANSIT) { + numInMemoryPartitions.getAndDecrement(); + } else { + numPartiallyInMemoryPartitions.getAndDecrement(); + } perThreadPartitionDictionary.get(owner).removePartition(meta); meta.setPartitionState(StorageState.ON_DISK); meta.setCurrentMessagesState(StorageState.ON_DISK); perThreadPartitionDictionary.get(owner).addPartition(meta); } + updateGraphFractionInMemory(); } /** @@ -675,8 +801,6 @@ public class MetaPartitionManager { dictionary.reset(); } numPartitionsProcessed.set(0); - lowestGraphFractionInMemory.set((double) getNumInMemoryPartitions() / - getNumPartitions()); } /** @@ -687,11 +811,10 @@ public class MetaPartitionManager { for (MetaPartition meta : partitions.values()) { int owner = getOwnerThreadId(meta.getPartitionId()); perThreadPartitionDictionary.get(owner).removePartition(meta); + PartitionStorageState stateBefore = meta.getPartitionStorageState(); meta.resetMessages(); - if (meta.getPartitionState() == StorageState.IN_MEM && - meta.getCurrentMessagesState() == StorageState.ON_DISK) { - numInMemoryPartitions.getAndDecrement(); - } + PartitionStorageState stateAfter = meta.getPartitionStorageState(); + updateCounters(stateBefore, stateAfter); perThreadPartitionDictionary.get(owner).addPartition(meta); } } @@ -863,6 +986,21 @@ public class MetaPartitionManager { currentMessagesState = incomingMessagesState; incomingMessagesState = StorageState.IN_MEM; } + + /** + * @return the state of the partition and its current messages as a whole + */ + public PartitionStorageState getPartitionStorageState() { + if (partitionState == StorageState.ON_DISK && + currentMessagesState == StorageState.ON_DISK) { + return PartitionStorageState.FULLY_ON_DISK; + } else if (partitionState == StorageState.IN_MEM && + currentMessagesState == StorageState.IN_MEM) { + return PartitionStorageState.FULLY_IN_MEM; + } else { + return PartitionStorageState.PARTIALLY_IN_MEM; + } + } } /** http://git-wip-us.apache.org/repos/asf/giraph/blob/b0262f8c/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/LocalDiskDataAccessor.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/LocalDiskDataAccessor.java b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/LocalDiskDataAccessor.java index 2e42906..8efa9de 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/LocalDiskDataAccessor.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/LocalDiskDataAccessor.java @@ -168,8 +168,6 @@ public class LocalDiskDataAccessor implements OutOfCoreDataAccessor { LocalDiskDataInputWrapper(String fileName, byte[] buffer) throws IOException { file = new File(fileName); - LOG.info("LocalDiskDataInputWrapper: obtaining a data input from local " + - "file " + file.getAbsolutePath()); if (LOG.isDebugEnabled()) { LOG.debug("LocalDiskDataInputWrapper: obtaining a data input from " + "local file " + file.getAbsolutePath()); @@ -216,8 +214,6 @@ public class LocalDiskDataAccessor implements OutOfCoreDataAccessor { LocalDiskDataOutputWrapper(String fileName, boolean shouldAppend, byte[] buffer) throws IOException { file = new File(fileName); - LOG.info("LocalDiskDataOutputWrapper: obtaining a data output from " + - "local file " + file.getAbsolutePath()); if (LOG.isDebugEnabled()) { LOG.debug("LocalDiskDataOutputWrapper: obtaining a data output from " + "local file " + file.getAbsolutePath()); http://git-wip-us.apache.org/repos/asf/giraph/blob/b0262f8c/giraph-core/src/main/java/org/apache/giraph/ooc/policy/FixedPartitionsOracle.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/FixedPartitionsOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/FixedPartitionsOracle.java index ffc5f7f..002dc85 100644 --- a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/FixedPartitionsOracle.java +++ b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/FixedPartitionsOracle.java @@ -29,6 +29,8 @@ import org.apache.log4j.Logger; import java.util.concurrent.atomic.AtomicInteger; +import static com.google.common.base.Preconditions.checkState; + /** Oracle for fixed out-of-core mechanism */ public class FixedPartitionsOracle implements OutOfCoreOracle { /** Class logger */ @@ -63,11 +65,16 @@ public class FixedPartitionsOracle implements OutOfCoreOracle { public IOAction[] getNextIOActions() { int numPartitionsInMemory = oocEngine.getMetaPartitionManager().getNumInMemoryPartitions(); - if (LOG.isInfoEnabled()) { - LOG.info("getNextIOActions: calling with " + numPartitionsInMemory + - " partitions in memory, " + deltaNumPartitionsInMemory.get() + - " to be loaded"); + int numPartialPartitionsInMemory = + oocEngine.getMetaPartitionManager().getNumPartiallyInMemoryPartitions(); + if (LOG.isDebugEnabled()) { + LOG.debug("getNextIOActions: calling with " + numPartitionsInMemory + + " partitions entirely in memory and " + numPartialPartitionsInMemory + + " partitions partially in memory, " + + deltaNumPartitionsInMemory.get() + " to be loaded"); } + checkState(numPartitionsInMemory >= 0); + checkState(numPartialPartitionsInMemory >= 0); int numPartitions = numPartitionsInMemory + deltaNumPartitionsInMemory.get(); // Fixed out-of-core policy: http://git-wip-us.apache.org/repos/asf/giraph/blob/b0262f8c/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java index a7451bc..1e4593b 100644 --- a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java +++ b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java @@ -164,12 +164,12 @@ public class TestPartitionStores { serviceWorker = Mockito.mock(CentralizedServiceWorker.class); Mockito.when(serviceWorker.getSuperstep()).thenReturn( BspService.INPUT_SUPERSTEP); + GraphTaskManager<IntWritable, IntWritable, NullWritable> + graphTaskManager = Mockito.mock(GraphTaskManager.class); + Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager); ServerData<IntWritable, IntWritable, NullWritable> serverData = new ServerData<>(serviceWorker, conf, context); Mockito.when(serviceWorker.getServerData()).thenReturn(serverData); - GraphTaskManager<IntWritable, IntWritable, NullWritable> - graphTaskManager = new GraphTaskManager<>(context); - Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager); DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable> partitionStore = @@ -193,12 +193,12 @@ public class TestPartitionStores { serviceWorker = Mockito.mock(CentralizedServiceWorker.class); Mockito.when(serviceWorker.getSuperstep()).thenReturn( BspService.INPUT_SUPERSTEP); + GraphTaskManager<IntWritable, IntWritable, NullWritable> + graphTaskManager = Mockito.mock(GraphTaskManager.class); + Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager); ServerData<IntWritable, IntWritable, NullWritable> serverData = new ServerData<>(serviceWorker, conf, context); Mockito.when(serviceWorker.getServerData()).thenReturn(serverData); - GraphTaskManager<IntWritable, IntWritable, NullWritable> - graphTaskManager = new GraphTaskManager<>(context); - Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager); DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable> partitionStore = @@ -311,12 +311,12 @@ public class TestPartitionStores { Mockito.when(serviceWorker.getSuperstep()).thenReturn( BspService.INPUT_SUPERSTEP); + GraphTaskManager<IntWritable, IntWritable, NullWritable> + graphTaskManager = Mockito.mock(GraphTaskManager.class); + Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager); ServerData<IntWritable, IntWritable, NullWritable> serverData = new ServerData<>(serviceWorker, conf, context); Mockito.when(serviceWorker.getServerData()).thenReturn(serverData); - GraphTaskManager<IntWritable, IntWritable, NullWritable> - graphTaskManager = new GraphTaskManager<>(context); - Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager); DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable> store = http://git-wip-us.apache.org/repos/asf/giraph/blob/b0262f8c/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java b/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java index 397605d..e497541 100644 --- a/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java +++ b/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java @@ -27,6 +27,9 @@ import org.apache.giraph.examples.SimplePageRankComputation.SimplePageRankVertex import org.apache.giraph.examples.SimplePageRankComputation.SimplePageRankVertexOutputFormat; import org.apache.giraph.job.GiraphJob; +import org.apache.giraph.ooc.OutOfCoreIOScheduler; +import org.apache.giraph.ooc.persistence.InMemoryDataAccessor; +import org.junit.Before; import org.junit.Test; import java.io.IOException; @@ -38,24 +41,17 @@ import static org.junit.Assert.assertTrue; * Unit test for out-of-core mechanism */ public class TestOutOfCore extends BspCase { - final static int NUM_PARTITIONS = 32; - final static int NUM_PARTITIONS_IN_MEMORY = 16; + private final static int NUM_PARTITIONS = 400; + private final static int NUM_PARTITIONS_IN_MEMORY = 8; + private GiraphConfiguration conf; public TestOutOfCore() { super(TestOutOfCore.class.getName()); } - /** - * Run a job that tests the fixed out-of-core mechanism - * - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - */ - @Test - public void testOutOfCore() - throws IOException, InterruptedException, ClassNotFoundException { - GiraphConfiguration conf = new GiraphConfiguration(); + @Before + public void prepareTest() { + conf = new GiraphConfiguration(); conf.setComputationClass(SimplePageRankComputation.class); conf.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class); conf.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class); @@ -66,12 +62,37 @@ public class TestOutOfCore extends BspCase { GiraphConstants.METRICS_ENABLE.set(conf, true); GiraphConstants.USER_PARTITION_COUNT.set(conf, NUM_PARTITIONS); GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true); - NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.set(conf, true); GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, NUM_PARTITIONS_IN_MEMORY); + OutOfCoreIOScheduler.OOC_WAIT_INTERVAL.set(conf, 10); GiraphConstants.NUM_COMPUTE_THREADS.set(conf, 8); GiraphConstants.NUM_INPUT_THREADS.set(conf, 8); GiraphConstants.NUM_OUTPUT_THREADS.set(conf, 8); + } + + @Test + public void testOutOfCoreInMemoryAccessor() + throws IOException, InterruptedException, ClassNotFoundException { + GiraphConstants.OUT_OF_CORE_DATA_ACCESSOR.set(conf, InMemoryDataAccessor.class); + GiraphConstants.NUM_OUT_OF_CORE_THREADS.set(conf, 8); + runTest(); + } + + @Test + public void testOutOfCoreLocalDiskAccessor() + throws IOException, InterruptedException, ClassNotFoundException { GiraphConstants.PARTITIONS_DIRECTORY.set(conf, "disk0,disk1,disk2"); + runTest(); + } + + /** + * Run a job with fixed out-of-core policy and verify the result + * + * @throws IOException + * @throws InterruptedException + * @throws ClassNotFoundException + */ + private void runTest() + throws IOException, InterruptedException, ClassNotFoundException { GiraphJob job = prepareJob(getCallingMethodName(), conf, getTempPath(getCallingMethodName())); // Overwrite the number of vertices set in BspCase
