Updated Branches: refs/heads/trunk 86c2f657f -> a6cb05bcb
GIRAPH-504: Create PartitionContext (majakabiljo) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/a6cb05bc Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/a6cb05bc Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/a6cb05bc Branch: refs/heads/trunk Commit: a6cb05bcb4f0fcbf7477297f15237d3536b6d658 Parents: 86c2f65 Author: Maja Kabiljo <[email protected]> Authored: Fri Feb 8 16:07:10 2013 -0800 Committer: Maja Kabiljo <[email protected]> Committed: Fri Feb 8 16:09:03 2013 -0800 ---------------------------------------------------------------------- CHANGELOG | 2 + .../java/org/apache/giraph/conf/GiraphClasses.java | 36 +++++ .../apache/giraph/conf/GiraphConfiguration.java | 13 ++ .../org/apache/giraph/conf/GiraphConstants.java | 2 + .../conf/ImmutableClassesGiraphConfiguration.java | 19 +++ .../examples/PartitionContextTestVertex.java | 115 +++++++++++++++ .../org/apache/giraph/graph/ComputeCallable.java | 15 ++ .../java/org/apache/giraph/graph/GraphState.java | 11 ++ .../apache/giraph/partition/BasicPartition.java | 106 +++++++++++++ .../giraph/partition/ByteArrayPartition.java | 64 ++------- .../giraph/partition/DefaultPartitionContext.java | 34 +++++ .../org/apache/giraph/partition/Partition.java | 12 ++ .../apache/giraph/partition/PartitionContext.java | 45 ++++++ .../apache/giraph/partition/SimplePartition.java | 56 +------ .../main/java/org/apache/giraph/vertex/Vertex.java | 10 ++ .../src/test/java/org/apache/giraph/BspCase.java | 3 + .../org/apache/giraph/TestPartitionContext.java | 72 +++++++++ 17 files changed, 517 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 3524ae3..c060209 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-504: Create PartitionContext (majakabiljo) + GIRAPH-499: Giraph should not reserve minimum reduce slot memory 1024 since we never use it (ereisman) GIRAPH-508: Increase the limit on the number of partitions (majakabiljo) http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java index d2641f1..5c2a01a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java @@ -21,6 +21,8 @@ import org.apache.giraph.aggregators.AggregatorWriter; import org.apache.giraph.combiner.Combiner; import org.apache.giraph.master.DefaultMasterCompute; import org.apache.giraph.graph.DefaultVertexResolver; +import org.apache.giraph.partition.DefaultPartitionContext; +import org.apache.giraph.partition.PartitionContext; import org.apache.giraph.worker.DefaultWorkerContext; import org.apache.giraph.io.EdgeInputFormat; import org.apache.giraph.master.MasterCompute; @@ -83,6 +85,8 @@ public class GiraphClasses<I extends WritableComparable, /** Vertex resolver class - cached for fast access */ protected Class<? extends VertexResolver<I, V, E, M>> vertexResolverClass; + /** Partition context class - cached for fast access */ + protected Class<? extends PartitionContext> partitionContextClass; /** Worker context class - cached for fast access */ protected Class<? extends WorkerContext> workerContextClass; /** Master compute class - cached for fast access */ @@ -145,6 +149,8 @@ public class GiraphClasses<I extends WritableComparable, vertexResolverClass = (Class<? extends VertexResolver<I, V, E, M>>) conf.getClass(VERTEX_RESOLVER_CLASS, DefaultVertexResolver.class, VertexResolver.class); + partitionContextClass = conf.getClass(PARTITION_CONTEXT_CLASS, + DefaultPartitionContext.class, PartitionContext.class); workerContextClass = conf.getClass(WORKER_CONTEXT_CLASS, DefaultWorkerContext.class, WorkerContext.class); masterComputeClass = conf.getClass(MASTER_COMPUTE_CLASS, @@ -329,6 +335,24 @@ public class GiraphClasses<I extends WritableComparable, } /** + * Check if PartitionContext is set + * + * @return true if PartitionContext is set + */ + public boolean hasPartitionContextClass() { + return partitionContextClass != null; + } + + /** + * Get PartitionContext used + * + * @return PartitionContext + */ + public Class<? extends PartitionContext> getPartitionContextClass() { + return partitionContextClass; + } + + /** * Check if WorkerContext is set * * @return true if WorkerContext is set @@ -523,6 +547,18 @@ public class GiraphClasses<I extends WritableComparable, } /** + * Set PartitionContext used + * + * @param partitionContextClass PartitionContext class to set + * @return this + */ + public GiraphClasses setPartitionContextClass( + Class<? extends PartitionContext> partitionContextClass) { + this.partitionContextClass = partitionContextClass; + return this; + } + + /** * Set WorkerContext used * * @param workerContextClass WorkerContext class to set http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java index 7e48103..dc5c84f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java @@ -30,6 +30,7 @@ import org.apache.giraph.master.MasterCompute; import org.apache.giraph.master.MasterObserver; import org.apache.giraph.partition.GraphPartitionerFactory; import org.apache.giraph.partition.Partition; +import org.apache.giraph.partition.PartitionContext; import org.apache.giraph.vertex.Vertex; import org.apache.giraph.worker.WorkerContext; import org.apache.giraph.worker.WorkerObserver; @@ -276,6 +277,18 @@ public class GiraphConfiguration extends Configuration } /** + * Set the partition context class (optional) + * + * @param partitionContextClass Determines what code is executed for each + * partition before and after each superstep + */ + public final void setPartitionContextClass( + Class<? extends PartitionContext> partitionContextClass) { + setClass(PARTITION_CONTEXT_CLASS, partitionContextClass, + PartitionContext.class); + } + + /** * Set the worker context class (optional) * * @param workerContextClass Determines what code is executed on a each http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index fb4e8a3..e3d8ff3 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -65,6 +65,8 @@ public interface GiraphConstants { String EDGE_VALUE_CLASS = "giraph.edgeValueClass"; /** Message value class */ String MESSAGE_VALUE_CLASS = "giraph.messageValueClass"; + /** Partition context class */ + String PARTITION_CONTEXT_CLASS = "giraph.partitionContextClass"; /** Worker context class */ String WORKER_CONTEXT_CLASS = "giraph.workerContextClass"; /** AggregatorWriter class - optional */ http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java index 30a7da7..3e158af 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java @@ -24,6 +24,7 @@ import org.apache.giraph.job.GiraphJobObserver; import org.apache.giraph.io.EdgeInputFormat; import org.apache.giraph.graph.GraphState; import org.apache.giraph.master.MasterCompute; +import org.apache.giraph.partition.PartitionContext; import org.apache.giraph.vertex.Vertex; import org.apache.giraph.io.VertexInputFormat; import org.apache.giraph.io.VertexOutputFormat; @@ -287,6 +288,24 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable, } /** + * Get the user's subclassed PartitionContext. + * + * @return User's partition context class + */ + public Class<? extends PartitionContext> getPartitionContextClass() { + return classes.getPartitionContextClass(); + } + + /** + * Create a user partition context + * + * @return Instantiated user partition context + */ + public PartitionContext createPartitionContext() { + return ReflectionUtils.newInstance(getPartitionContextClass(), this); + } + + /** * Get the user's subclassed WorkerContext. * * @return User's worker context class http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/examples/PartitionContextTestVertex.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/examples/PartitionContextTestVertex.java b/giraph-core/src/main/java/org/apache/giraph/examples/PartitionContextTestVertex.java new file mode 100644 index 0000000..f86c323 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/examples/PartitionContextTestVertex.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.examples; + +import org.apache.giraph.partition.DefaultPartitionContext; +import org.apache.giraph.vertex.EdgeListVertex; +import org.apache.giraph.worker.DefaultWorkerContext; +import org.apache.giraph.worker.WorkerContext; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.LongWritable; + +import java.io.IOException; + +/** + * Vertex to test the functionality of PartitionContext + */ +public class PartitionContextTestVertex extends + EdgeListVertex<LongWritable, DoubleWritable, FloatWritable, + DoubleWritable> { + /** How many compute threads to use in the test */ + public static final int NUM_COMPUTE_THREADS = 10; + /** How many vertices to create for the test */ + public static final int NUM_VERTICES = 100; + /** How many partitions to have */ + public static final int NUM_PARTITIONS = 25; + + @Override + public void compute(Iterable<DoubleWritable> messages) throws IOException { + TestPartitionContextPartitionContext partitionContext = + (TestPartitionContextPartitionContext) getPartitionContext(); + partitionContext.counter++; + if (getSuperstep() > 5) { + voteToHalt(); + } + } + + /** + * PartitionContext for TestPartitionContext + */ + public static class TestPartitionContextPartitionContext extends + DefaultPartitionContext { + /** + * The counter should hold the number of vertices in this partition, + * plus the current superstep + */ + private long counter; + + @Override + public void preSuperstep(WorkerContext workerContext) { + counter = + ((TestPartitionContextWorkerContext) workerContext).superstepCounter; + } + + @Override + public void postSuperstep(WorkerContext workerContext) { + ((TestPartitionContextWorkerContext) workerContext).totalCounter += + counter; + } + } + + /** + * WorkerContext for TestPartitionContext + */ + public static class TestPartitionContextWorkerContext extends + DefaultWorkerContext { + /** Current superstep */ + private long superstepCounter; + /** + * This counter should hold the sum of PartitionContext's counters + */ + private long totalCounter; + + @Override + public void preSuperstep() { + superstepCounter = getSuperstep(); + totalCounter = 0; + } + + @Override + public void postSuperstep() { + assertEquals(totalCounter, + NUM_PARTITIONS * superstepCounter + getTotalNumVertices()); + } + } + + /** + * Throws exception if values are not equal. + * + * @param expected Expected value + * @param actual Actual value + */ + private static void assertEquals(long expected, long actual) { + if (expected != actual) { + throw new RuntimeException("expected: " + expected + + ", actual: " + actual); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java index 94ed6d9..c7aff7c 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java @@ -27,6 +27,7 @@ import org.apache.giraph.metrics.MetricNames; import org.apache.giraph.metrics.SuperstepMetricsRegistry; import org.apache.giraph.metrics.TimerDesc; import org.apache.giraph.partition.Partition; +import org.apache.giraph.partition.PartitionContext; import org.apache.giraph.partition.PartitionStats; import org.apache.giraph.time.SystemTime; import org.apache.giraph.time.Time; @@ -34,6 +35,7 @@ import org.apache.giraph.time.Times; import org.apache.giraph.utils.MemoryUtils; import org.apache.giraph.utils.TimedLogger; import org.apache.giraph.vertex.Vertex; +import org.apache.giraph.worker.WorkerContext; import org.apache.giraph.worker.WorkerThreadAggregatorUsage; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -197,6 +199,15 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable, new PartitionStats(partition.getId(), 0, 0, 0, 0); // Make sure this is thread-safe across runs synchronized (partition) { + // Prepare Partition context + WorkerContext workerContext = + graphState.getGraphTaskManager().getWorkerContext(); + PartitionContext partitionContext = partition.getPartitionContext(); + synchronized (workerContext) { + partitionContext.preSuperstep(workerContext); + } + graphState.setPartitionContext(partition.getPartitionContext()); + for (Vertex<I, V, E, M> vertex : partition) { // Make sure every vertex has this thread's // graphState before computing @@ -229,6 +240,10 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable, } messageStore.clearPartition(partition.getId()); + + synchronized (workerContext) { + partitionContext.postSuperstep(workerContext); + } } return partitionStats; } http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/graph/GraphState.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphState.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphState.java index 9cdec7c..93ad5df 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphState.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphState.java @@ -18,6 +18,7 @@ package org.apache.giraph.graph; import org.apache.giraph.comm.WorkerClientRequestProcessor; +import org.apache.giraph.partition.PartitionContext; import org.apache.giraph.worker.WorkerAggregatorUsage; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -49,6 +50,8 @@ E extends Writable, M extends Writable> { workerClientRequestProcessor; /** Worker aggregator usage */ private final WorkerAggregatorUsage workerAggregatorUsage; + /** Partition context */ + private PartitionContext partitionContext; /** * Constructor @@ -106,6 +109,14 @@ E extends Writable, M extends Writable> { return workerAggregatorUsage; } + public void setPartitionContext(PartitionContext partitionContext) { + this.partitionContext = partitionContext; + } + + public PartitionContext getPartitionContext() { + return partitionContext; + } + @Override public String toString() { return "(superstep=" + superstep + ",numVertices=" + numVertices + "," + http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartition.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartition.java new file mode 100644 index 0000000..dc9192e --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartition.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.partition; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.util.Progressable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * Basic partition class for other partitions to extend. Holds partition id, + * configuration, progressable and partition context + * + * @param <I> Vertex index value + * @param <V> Vertex value + * @param <E> Edge value + * @param <M> Message data + */ +public abstract class BasicPartition<I extends WritableComparable, + V extends Writable, E extends Writable, M extends Writable> + implements Partition<I, V, E, M> { + /** Configuration from the worker */ + private ImmutableClassesGiraphConfiguration<I, V, E, M> conf; + /** Partition id */ + private int id; + /** Context used to report progress */ + private Progressable progressable; + /** Partition context */ + private PartitionContext partitionContext; + + @Override + public void initialize(int partitionId, Progressable progressable) { + setId(partitionId); + setProgressable(progressable); + partitionContext = conf.createPartitionContext(); + } + + @Override + public void setConf( + ImmutableClassesGiraphConfiguration<I, V, E, M> configuration) { + conf = configuration; + } + + @Override + public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() { + return conf; + } + + @Override + public int getId() { + return id; + } + + @Override + public void setId(int id) { + this.id = id; + } + + @Override + public PartitionContext getPartitionContext() { + return partitionContext; + } + + @Override + public void progress() { + if (progressable != null) { + progressable.progress(); + } + } + + @Override + public void setProgressable(Progressable progressable) { + this.progressable = progressable; + } + + @Override + public void write(DataOutput output) throws IOException { + output.writeInt(id); + } + + @Override + public void readFields(DataInput input) throws IOException { + id = input.readInt(); + partitionContext = conf.createPartitionContext(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java index d34af11..1298918 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java @@ -27,7 +27,6 @@ import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentMap; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.vertex.Vertex; import org.apache.giraph.utils.UnsafeByteArrayInputStream; import org.apache.giraph.utils.WritableUtils; @@ -48,21 +47,15 @@ import org.apache.log4j.Logger; */ public class ByteArrayPartition<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> - implements Partition<I, V, E, M> { + extends BasicPartition<I, V, E, M> { /** Class logger */ private static final Logger LOG = Logger.getLogger(ByteArrayPartition.class); - /** Configuration from the worker */ - private ImmutableClassesGiraphConfiguration<I, V, E, M> conf; - /** Partition id */ - private int id; /** * Vertex map for this range (keyed by index). Note that the byte[] is a * serialized vertex with the first four bytes as the length of the vertex * to read. */ private ConcurrentMap<I, byte[]> vertexMap; - /** Context used to report progress */ - private Progressable progressable; /** Representative vertex */ private Vertex<I, V, E, M> representativeVertex; /** Use unsafe serialization */ @@ -75,12 +68,11 @@ public class ByteArrayPartition<I extends WritableComparable, @Override public void initialize(int partitionId, Progressable progressable) { - setId(partitionId); - setProgressable(progressable); + super.initialize(partitionId, progressable); vertexMap = new MapMaker().concurrencyLevel( - conf.getNettyServerExecutionConcurrency()).makeMap(); - representativeVertex = conf.createVertex(); - useUnsafeSerialization = conf.useUnsafeSerialization(); + getConf().getNettyServerExecutionConcurrency()).makeMap(); + representativeVertex = getConf().createVertex(); + useUnsafeSerialization = getConf().useUnsafeSerialization(); } @Override @@ -152,21 +144,6 @@ public class ByteArrayPartition<I extends WritableComparable, } @Override - public int getId() { - return id; - } - - @Override - public void setId(int id) { - this.id = id; - } - - @Override - public void setProgressable(Progressable progressable) { - this.progressable = progressable; - } - - @Override public void saveVertex(Vertex<I, V, E, M> vertex) { // Reuse the old buffer whenever possible byte[] oldVertexData = vertexMap.get(vertex.getId()); @@ -183,12 +160,10 @@ public class ByteArrayPartition<I extends WritableComparable, @Override public void write(DataOutput output) throws IOException { - output.writeInt(id); + super.write(output); output.writeInt(vertexMap.size()); for (Map.Entry<I, byte[]> entry : vertexMap.entrySet()) { - if (progressable != null) { - progressable.progress(); - } + progress(); entry.getKey().write(output); // Note here that we are writing the size of the vertex data first // as it is encoded in the first four bytes of the byte[] @@ -207,18 +182,16 @@ public class ByteArrayPartition<I extends WritableComparable, @Override public void readFields(DataInput input) throws IOException { - id = input.readInt(); + super.readFields(input); int size = input.readInt(); vertexMap = new MapMaker().concurrencyLevel( - conf.getNettyServerExecutionConcurrency()).initialCapacity( + getConf().getNettyServerExecutionConcurrency()).initialCapacity( size).makeMap(); - representativeVertex = conf.createVertex(); - useUnsafeSerialization = conf.useUnsafeSerialization(); + representativeVertex = getConf().createVertex(); + useUnsafeSerialization = getConf().useUnsafeSerialization(); for (int i = 0; i < size; ++i) { - if (progressable != null) { - progressable.progress(); - } - I vertexId = conf.createVertexId(); + progress(); + I vertexId = getConf().createVertexId(); vertexId.readFields(input); int vertexDataSize = input.readInt(); byte[] vertexData = new byte[vertexDataSize]; @@ -231,17 +204,6 @@ public class ByteArrayPartition<I extends WritableComparable, } @Override - public void setConf( - ImmutableClassesGiraphConfiguration<I, V, E, M> configuration) { - conf = configuration; - } - - @Override - public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() { - return conf; - } - - @Override public Iterator<Vertex<I, V, E, M>> iterator() { return new RepresentativeVertexIterator(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/partition/DefaultPartitionContext.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/DefaultPartitionContext.java b/giraph-core/src/main/java/org/apache/giraph/partition/DefaultPartitionContext.java new file mode 100644 index 0000000..c22c802 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/partition/DefaultPartitionContext.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.partition; + +import org.apache.giraph.worker.WorkerContext; + +/** + * Empty implementation of {@link PartitionContext} + */ +public class DefaultPartitionContext implements PartitionContext { + @Override + public void preSuperstep(WorkerContext workerContext) { + } + + @Override + public void postSuperstep(WorkerContext workerContext) { + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java b/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java index 55ce8c0..657c054 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java @@ -107,6 +107,11 @@ public interface Partition<I extends WritableComparable, void setId(int id); /** + * Report progress. + */ + void progress(); + + /** * Set the context. * * @param progressable Progressable @@ -119,4 +124,11 @@ public interface Partition<I extends WritableComparable, * @param vertex Vertex to save */ void saveVertex(Vertex<I, V, E, M> vertex); + + /** + * Get partition context + * + * @return Partition context + */ + PartitionContext getPartitionContext(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/partition/PartitionContext.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionContext.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionContext.java new file mode 100644 index 0000000..412f6e3 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionContext.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.partition; + +import org.apache.giraph.worker.WorkerContext; + +/** + * PartitionContext allows for the execution of user code + * on a per-partition basis. There's one PartitionContext per partition. + */ +public interface PartitionContext { + /** + * Execute user code. + * This method is executed once for each partition before computation for + * that partition starts. + * + * @param workerContext Worker context + */ + void preSuperstep(WorkerContext workerContext); + + /** + * Execute user code. + * This method is executed once on for each partition after computation in + * current superstep for that partition ends. + * + * @param workerContext Worker context + */ + void postSuperstep(WorkerContext workerContext); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java index 479011f..cbf6bc3 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java @@ -19,7 +19,6 @@ package org.apache.giraph.partition; import org.apache.giraph.conf.GiraphConstants; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.vertex.Vertex; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -46,15 +45,9 @@ import java.util.concurrent.ConcurrentSkipListMap; @SuppressWarnings("rawtypes") public class SimplePartition<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> - implements Partition<I, V, E, M> { - /** Configuration from the worker */ - private ImmutableClassesGiraphConfiguration<I, V, E, M> conf; - /** Partition id */ - private int id; + extends BasicPartition<I, V, E, M> { /** Vertex map for this range (keyed by index) */ private ConcurrentMap<I, Vertex<I, V, E, M>> vertexMap; - /** Context used to report progress */ - private Progressable progressable; /** * Constructor for reflection. @@ -63,9 +56,8 @@ public class SimplePartition<I extends WritableComparable, @Override public void initialize(int partitionId, Progressable progressable) { - setId(partitionId); - setProgressable(progressable); - if (conf.getBoolean(GiraphConstants.USE_OUT_OF_CORE_MESSAGES, + super.initialize(partitionId, progressable); + if (getConf().getBoolean(GiraphConstants.USE_OUT_OF_CORE_MESSAGES, GiraphConstants.USE_OUT_OF_CORE_MESSAGES_DEFAULT)) { vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E, M>>(); } else { @@ -110,21 +102,6 @@ public class SimplePartition<I extends WritableComparable, } @Override - public int getId() { - return id; - } - - @Override - public void setId(int id) { - this.id = id; - } - - @Override - public void setProgressable(Progressable progressable) { - this.progressable = progressable; - } - - @Override public void saveVertex(Vertex<I, V, E, M> vertex) { // No-op, vertices are stored as Java objects in this partition } @@ -136,19 +113,17 @@ public class SimplePartition<I extends WritableComparable, @Override public void readFields(DataInput input) throws IOException { - if (conf.getBoolean(GiraphConstants.USE_OUT_OF_CORE_MESSAGES, + super.readFields(input); + if (getConf().getBoolean(GiraphConstants.USE_OUT_OF_CORE_MESSAGES, GiraphConstants.USE_OUT_OF_CORE_MESSAGES_DEFAULT)) { vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E, M>>(); } else { vertexMap = Maps.newConcurrentMap(); } - id = input.readInt(); int vertices = input.readInt(); for (int i = 0; i < vertices; ++i) { - Vertex<I, V, E, M> vertex = conf.createVertex(); - if (progressable != null) { - progressable.progress(); - } + Vertex<I, V, E, M> vertex = getConf().createVertex(); + progress(); vertex.readFields(input); if (vertexMap.put(vertex.getId(), vertex) != null) { throw new IllegalStateException( @@ -160,28 +135,15 @@ public class SimplePartition<I extends WritableComparable, @Override public void write(DataOutput output) throws IOException { - output.writeInt(id); + super.write(output); output.writeInt(vertexMap.size()); for (Vertex vertex : vertexMap.values()) { - if (progressable != null) { - progressable.progress(); - } + progress(); vertex.write(output); } } @Override - public void setConf( - ImmutableClassesGiraphConfiguration<I, V, E, M> configuration) { - this.conf = configuration; - } - - @Override - public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() { - return conf; - } - - @Override public Iterator<Vertex<I, V, E, M>> iterator() { return vertexMap.values().iterator(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/vertex/Vertex.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/vertex/Vertex.java b/giraph-core/src/main/java/org/apache/giraph/vertex/Vertex.java index 974232e..db6dca3 100644 --- a/giraph-core/src/main/java/org/apache/giraph/vertex/Vertex.java +++ b/giraph-core/src/main/java/org/apache/giraph/vertex/Vertex.java @@ -23,6 +23,7 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.DefaultEdge; import org.apache.giraph.graph.Edge; import org.apache.giraph.graph.GraphState; +import org.apache.giraph.partition.PartitionContext; import org.apache.giraph.partition.PartitionOwner; import org.apache.giraph.worker.WorkerAggregatorUsage; import org.apache.giraph.worker.WorkerContext; @@ -328,6 +329,15 @@ public abstract class Vertex<I extends WritableComparable, } /** + * Get the partition context + * + * @return Partition context + */ + public PartitionContext getPartitionContext() { + return getGraphState().getPartitionContext(); + } + + /** * Get the worker context * * @return WorkerContext context http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/test/java/org/apache/giraph/BspCase.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/BspCase.java b/giraph-core/src/test/java/org/apache/giraph/BspCase.java index 6aab533..0fe9fda 100644 --- a/giraph-core/src/test/java/org/apache/giraph/BspCase.java +++ b/giraph-core/src/test/java/org/apache/giraph/BspCase.java @@ -181,6 +181,9 @@ public class BspCase implements Watcher { if (classes.hasVertexOutputFormat()) { conf.setVertexOutputFormatClass(classes.getVertexOutputFormatClass()); } + if (classes.hasPartitionContextClass()) { + conf.setPartitionContextClass(classes.getPartitionContextClass()); + } if (classes.hasWorkerContextClass()) { conf.setWorkerContextClass(classes.getWorkerContextClass()); } http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/test/java/org/apache/giraph/TestPartitionContext.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/TestPartitionContext.java b/giraph-core/src/test/java/org/apache/giraph/TestPartitionContext.java new file mode 100644 index 0000000..cdf1f65 --- /dev/null +++ b/giraph-core/src/test/java/org/apache/giraph/TestPartitionContext.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph; + +import org.apache.giraph.conf.GiraphClasses; +import org.apache.giraph.examples.PartitionContextTestVertex; +import org.apache.giraph.examples.GeneratedVertexReader; +import org.apache.giraph.examples.SimplePageRankVertex; +import org.apache.giraph.job.GiraphJob; +import org.apache.giraph.partition.HashMasterPartitioner; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.LongWritable; +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; + +public class TestPartitionContext extends BspCase { + public TestPartitionContext() { + super(TestPartitionContext.class.getName()); + } + + @Test + public void testPartitionContext() throws IOException, + ClassNotFoundException, InterruptedException { + if (runningInDistributedMode()) { + System.out.println( + "testComputeContext: Ignore this test in distributed mode."); + return; + } + GiraphClasses<LongWritable, DoubleWritable, FloatWritable, DoubleWritable> + classes = new GiraphClasses(); + classes.setVertexClass(PartitionContextTestVertex.class); + classes.setVertexInputFormatClass( + SimplePageRankVertex.SimplePageRankVertexInputFormat.class); + classes.setWorkerContextClass( + PartitionContextTestVertex.TestPartitionContextWorkerContext.class); + classes.setPartitionContextClass( + PartitionContextTestVertex.TestPartitionContextPartitionContext.class); + GiraphJob job = prepareJob(getCallingMethodName(), classes); + // Use multithreading + job.getConfiguration().setNumComputeThreads( + PartitionContextTestVertex.NUM_COMPUTE_THREADS); + // Increase the number of vertices + job.getConfiguration().setInt( + GeneratedVertexReader.READER_VERTICES, + PartitionContextTestVertex.NUM_VERTICES); + // Increase the number of partitions + job.getConfiguration().setInt( + HashMasterPartitioner.USER_PARTITION_COUNT, + PartitionContextTestVertex.NUM_PARTITIONS); + assertTrue(job.run(true)); + } +}
