Repository: giraph Updated Branches: refs/heads/trunk 7b4a5b7fa -> 4b743f163
GIRAPH-1008: Create Computation per thread instead of per partition Summary: Currently we create Computation per partition, but there can be much more partitions than compute threads, and Computation can hold large objects or pre/post superstep can be expensive. Test Plan: mvn clean verify, one of the tests was relying on per partition threads so modified that Reviewers: ikabiljo Differential Revision: https://reviews.facebook.net/D38793 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/4b743f16 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/4b743f16 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/4b743f16 Branch: refs/heads/trunk Commit: 4b743f163616281d98af6684280e9caad7470358 Parents: 7b4a5b7 Author: Maja Kabiljo <[email protected]> Authored: Thu May 21 12:08:17 2015 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Thu May 21 16:25:00 2015 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 ++ .../org/apache/giraph/graph/ComputeCallable.java | 16 ++++++++-------- .../examples/TestComputationStateComputation.java | 2 +- 3 files changed, 11 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/4b743f16/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 442ade3..bb5e7e8 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.2.0 - unreleased + GIRAPH-1008: Create Computation per thread instead of per partition (majakabiljo) + GIRAPH-1004: Allow changing hadoop output format (majakabiljo) GIRAPH-1002: Improve message changing through iters (ikabiljo via edunov) http://git-wip-us.apache.org/repos/asf/giraph/blob/4b743f16/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java index 226087c..16c798c 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java @@ -148,6 +148,12 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable, vertexWriter = serviceWorker.getSuperstepOutput().getVertexWriter(); + Computation<I, V, E, M1, M2> computation = + (Computation<I, V, E, M1, M2>) configuration.createComputation(); + computation.initialize(graphState, workerClientRequestProcessor, + serviceWorker.getGraphTaskManager(), aggregatorUsage, workerContext); + computation.preSuperstep(); + List<PartitionStats> partitionStatsList = Lists.newArrayList(); while (!partitionIdQueue.isEmpty()) { Integer partitionId = partitionIdQueue.poll(); @@ -159,12 +165,6 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable, serviceWorker.getPartitionStore().getOrCreatePartition(partitionId); long startTime = System.currentTimeMillis(); - Computation<I, V, E, M1, M2> computation = - (Computation<I, V, E, M1, M2>) configuration.createComputation(); - computation.initialize(graphState, workerClientRequestProcessor, - serviceWorker.getGraphTaskManager(), aggregatorUsage, workerContext); - computation.preSuperstep(); - try { PartitionStats partitionStats = computePartition(computation, partition); @@ -190,12 +190,12 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable, serviceWorker.getPartitionStore().putPartition(partition); } - computation.postSuperstep(); - histogramComputePerPartition.update( System.currentTimeMillis() - startTime); } + computation.postSuperstep(); + // Return VertexWriter after the usage serviceWorker.getSuperstepOutput().returnVertexWriter(vertexWriter); http://git-wip-us.apache.org/repos/asf/giraph/blob/4b743f16/giraph-examples/src/main/java/org/apache/giraph/examples/TestComputationStateComputation.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/TestComputationStateComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/TestComputationStateComputation.java index ad72951..133e561 100644 --- a/giraph-examples/src/main/java/org/apache/giraph/examples/TestComputationStateComputation.java +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/TestComputationStateComputation.java @@ -90,7 +90,7 @@ public class TestComputationStateComputation extends BasicComputation< @Override public void postSuperstep() { assertEquals(totalCounter.get(), - NUM_PARTITIONS * superstepCounter + getTotalNumVertices()); + NUM_COMPUTE_THREADS * superstepCounter + getTotalNumVertices()); } }
