Updated Branches: refs/heads/trunk 0d358c9d5 -> 1eaddd183
GIRAPH-673: Input superstep should support aggregators like any other superstep (Bingjing via aching) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/1eaddd18 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/1eaddd18 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/1eaddd18 Branch: refs/heads/trunk Commit: 1eaddd183259047476893a014fb24e44fc549531 Parents: 0d358c9 Author: Avery Ching <[email protected]> Authored: Wed Jun 19 10:04:28 2013 -0700 Committer: Avery Ching <[email protected]> Committed: Wed Jun 19 10:04:42 2013 -0700 ---------------------------------------------------------------------- CHANGELOG | 3 + .../java/org/apache/giraph/io/EdgeReader.java | 32 +++- .../java/org/apache/giraph/io/VertexReader.java | 32 +++- .../giraph/io/internal/WrappedEdgeReader.java | 7 + .../giraph/io/internal/WrappedVertexReader.java | 7 + .../apache/giraph/master/BspServiceMaster.java | 91 +++++++---- .../apache/giraph/worker/BspServiceWorker.java | 12 +- .../giraph/worker/EdgeInputSplitsCallable.java | 9 ++ .../worker/VertexInputSplitsCallable.java | 7 + .../examples/AggregatorsTestComputation.java | 154 +++++++++++++++++++ .../giraph/examples/GeneratedEdgeReader.java | 73 +++++++++ .../aggregators/TestAggregatorsHandling.java | 13 +- 12 files changed, 400 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/1eaddd18/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 50d411a..76f7e6b 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,9 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-673: Input superstep should support aggregators like any + other superstep (Bingjing via aching) + GIRAPH-686: DiskBackedPartitionStore does not saveVertex after edges are loaded (claudio) http://git-wip-us.apache.org/repos/asf/giraph/blob/1eaddd18/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java b/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java index 363a5e6..1bc48e3 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java @@ -21,6 +21,7 @@ package org.apache.giraph.io; import java.io.IOException; import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; import org.apache.giraph.edge.Edge; +import org.apache.giraph.worker.WorkerAggregatorUsage; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; @@ -36,7 +37,11 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; @SuppressWarnings("rawtypes") public abstract class EdgeReader<I extends WritableComparable, E extends Writable> extends DefaultImmutableClassesGiraphConfigurable< - I, Writable, E> { + I, Writable, E> implements WorkerAggregatorUsage { + + /** Aggregator usage for edge reader */ + private WorkerAggregatorUsage workerAggregatorUsage; + /** * Use the input split and context to setup reading the edges. * Guaranteed to be called prior to any other function. @@ -51,6 +56,21 @@ public abstract class EdgeReader<I extends WritableComparable, throws IOException, InterruptedException; /** + * Set aggregator usage. It provides the functionality + * of aggregation operation in reading an edge. + * It is invoked just after initialization. + * E.g., + * edgeReader.initialize(inputSplit, context); + * edgeReader.setAggregator(aggregatorUsage); + * This method is only for use by the infrastructure. + * + * @param agg aggregator usage for edge reader + */ + public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) { + workerAggregatorUsage = agg; + } + + /** * Read the next edge. * * @return false iff there are no more edges @@ -97,4 +117,14 @@ public abstract class EdgeReader<I extends WritableComparable, * @throws InterruptedException */ public abstract float getProgress() throws IOException, InterruptedException; + + @Override + public <A extends Writable> void aggregate(String name, A value) { + workerAggregatorUsage.aggregate(name, value); + } + + @Override + public <A extends Writable> A getAggregatedValue(String name) { + return workerAggregatorUsage.<A>getAggregatedValue(name); + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/1eaddd18/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java index b8b82af..94a4083 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java @@ -20,6 +20,7 @@ package org.apache.giraph.io; import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; import org.apache.giraph.graph.Vertex; +import org.apache.giraph.worker.WorkerAggregatorUsage; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; @@ -38,7 +39,11 @@ import java.io.IOException; @SuppressWarnings("rawtypes") public abstract class VertexReader<I extends WritableComparable, V extends Writable, E extends Writable> extends - DefaultImmutableClassesGiraphConfigurable<I, V, E> { + DefaultImmutableClassesGiraphConfigurable<I, V, E> + implements WorkerAggregatorUsage { + /** Aggregator usage for vertex reader */ + private WorkerAggregatorUsage workerAggregatorUsage; + /** * Use the input split and context to setup reading the vertices. * Guaranteed to be called prior to any other function. @@ -53,6 +58,21 @@ public abstract class VertexReader<I extends WritableComparable, throws IOException, InterruptedException; /** + * Set aggregator usage. It provides the functionality + * of aggregation operation in reading a vertex. + * It is invoked just after initialization. + * E.g., + * vertexReader.initialize(inputSplit, context); + * vertexReader.setAggregator(aggregatorUsage); + * This method is only for use by the infrastructure. + * + * @param agg aggregator usage for vertex reader + */ + public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) { + workerAggregatorUsage = agg; + } + + /** * * @return false iff there are no more vertices * @throws IOException @@ -88,4 +108,14 @@ public abstract class VertexReader<I extends WritableComparable, * @throws InterruptedException */ public abstract float getProgress() throws IOException, InterruptedException; + + @Override + public <A extends Writable> void aggregate(String name, A value) { + workerAggregatorUsage.aggregate(name, value); + } + + @Override + public <A extends Writable> A getAggregatedValue(String name) { + return workerAggregatorUsage.<A>getAggregatedValue(name); + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/1eaddd18/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java index aae7a72..e3b3689 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java @@ -21,6 +21,7 @@ package org.apache.giraph.io.internal; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.edge.Edge; import org.apache.giraph.io.EdgeReader; +import org.apache.giraph.worker.WorkerAggregatorUsage; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; @@ -70,6 +71,12 @@ public class WrappedEdgeReader<I extends WritableComparable, } @Override + public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) { + // Set aggregator usage for edge reader + baseEdgeReader.setWorkerAggregatorUse(agg); + } + + @Override public boolean nextEdge() throws IOException, InterruptedException { return baseEdgeReader.nextEdge(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/1eaddd18/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java index 54adfec..bf0a212 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java @@ -21,6 +21,7 @@ package org.apache.giraph.io.internal; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.Vertex; import org.apache.giraph.io.VertexReader; +import org.apache.giraph.worker.WorkerAggregatorUsage; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; @@ -71,6 +72,12 @@ public class WrappedVertexReader<I extends WritableComparable, } @Override + public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) { + // Set aggregator usage for vertex reader + baseVertexReader.setWorkerAggregatorUse(agg); + } + + @Override public boolean nextVertex() throws IOException, InterruptedException { return baseVertexReader.nextVertex(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/1eaddd18/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java index bd48116..0d266a6 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -1425,6 +1425,58 @@ public class BspServiceMaster<I extends WritableComparable, } } + /** + * Initialize aggregator at the master side + * before vertex/edge loading. + * This methods cooperates with other code + * to enables aggregation usage at INPUT_SUPERSTEP + * Other codes are: + * BSPServiceWorker: + * aggregatorHandler.prepareSuperstep in + * setup + * set aggregator usage in vertexReader and + * edgeReader + * + * @throws InterruptedException + */ + private void initializeAggregatorInputSuperstep() + throws InterruptedException { + aggregatorHandler.prepareSuperstep(masterClient); + prepareMasterCompute(getSuperstep()); + try { + masterCompute.initialize(); + } catch (InstantiationException e) { + LOG.fatal( + "initializeAggregatorInputSuperstep: Failed in instantiation", e); + throw new RuntimeException( + "initializeAggregatorInputSuperstep: Failed in instantiation", e); + } catch (IllegalAccessException e) { + LOG.fatal("initializeAggregatorInputSuperstep: Failed in access", e); + throw new RuntimeException( + "initializeAggregatorInputSuperstep: Failed in access", e); + } + aggregatorHandler.finishSuperstep(masterClient); + } + + /** + * This is required before initialization + * and run of MasterCompute + * + * @param superstep superstep for which to run masterCompute + * @return Superstep classes set by masterCompute + */ + private SuperstepClasses prepareMasterCompute(long superstep) { + GraphState graphState = new GraphState(superstep , + GiraphStats.getInstance().getVertices().getValue(), + GiraphStats.getInstance().getEdges().getValue(), + getContext()); + SuperstepClasses superstepClasses = + new SuperstepClasses(getConfiguration()); + masterCompute.setGraphState(graphState); + masterCompute.setSuperstepClasses(superstepClasses); + return superstepClasses; + } + @Override public SuperstepState coordinateSuperstep() throws KeeperException, InterruptedException { @@ -1495,6 +1547,9 @@ public class BspServiceMaster<I extends WritableComparable, } if (getSuperstep() == INPUT_SUPERSTEP) { + // Initialize aggregators before coordinating + // vertex loading and edge loading + initializeAggregatorInputSuperstep(); if (getConfiguration().hasVertexInputFormat()) { coordinateInputSplits(vertexInputSplitsPaths, vertexInputSplitsEvents, "Vertex"); @@ -1516,7 +1571,9 @@ public class BspServiceMaster<I extends WritableComparable, // Collect aggregator values, then run the master.compute() and // finally save the aggregator values aggregatorHandler.prepareSuperstep(masterClient); - SuperstepClasses superstepClasses = runMasterCompute(getSuperstep()); + SuperstepClasses superstepClasses = + prepareMasterCompute(getSuperstep() + 1); + doMasterCompute(); // If the master is halted or all the vertices voted to halt and there // are no more messages in the system, stop the computation @@ -1569,39 +1626,13 @@ public class BspServiceMaster<I extends WritableComparable, } /** - * Run the master.compute() class - * - * @param superstep superstep for which to run the master.compute() - * @return Superstep classes set by Master compute + * This doMasterCompute is only called + * after masterCompute is initialized */ - private SuperstepClasses runMasterCompute(long superstep) { - // The master.compute() should run logically before the workers, so - // increase the superstep counter it uses by one - GraphState graphState = new GraphState(superstep + 1, - GiraphStats.getInstance().getVertices().getValue(), - GiraphStats.getInstance().getEdges().getValue(), - getContext()); - SuperstepClasses superstepClasses = - new SuperstepClasses(getConfiguration()); - masterCompute.setGraphState(graphState); - masterCompute.setSuperstepClasses(superstepClasses); - if (superstep == INPUT_SUPERSTEP) { - try { - masterCompute.initialize(); - } catch (InstantiationException e) { - LOG.fatal("runMasterCompute: Failed in instantiation", e); - throw new RuntimeException( - "runMasterCompute: Failed in instantiation", e); - } catch (IllegalAccessException e) { - LOG.fatal("runMasterCompute: Failed in access", e); - throw new RuntimeException( - "runMasterCompute: Failed in access", e); - } - } + private void doMasterCompute() { GiraphTimerContext timerContext = masterComputeTimer.time(); masterCompute.compute(); timerContext.stop(); - return superstepClasses; } /** http://git-wip-us.apache.org/repos/asf/giraph/blob/1eaddd18/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index eb6d30d..342e2b2 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -480,11 +480,15 @@ public class BspServiceWorker<I extends WritableComparable, workerGraphPartitioner.updatePartitionOwners( getWorkerInfo(), masterSetPartitionOwners, getPartitionStore()); -/*if[HADOOP_NON_SECURE] - workerClient.setup(); -else[HADOOP_NON_SECURE]*/ + /*if[HADOOP_NON_SECURE] + workerClient.setup(); + else[HADOOP_NON_SECURE]*/ workerClient.setup(getConfiguration().authenticate()); -/*end[HADOOP_NON_SECURE]*/ + /*end[HADOOP_NON_SECURE]*/ + + // Initialize aggregator at worker side during setup. + // Do this just before vertex and edge loading. + aggregatorHandler.prepareSuperstep(workerAggregatorRequestProcessor); VertexEdgeCount vertexEdgeCount; http://git-wip-us.apache.org/repos/asf/giraph/blob/1eaddd18/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java index 78cdd8e..c2c72c6 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java @@ -59,6 +59,9 @@ public class EdgeInputSplitsCallable<I extends WritableComparable, /** Class logger */ private static final Logger LOG = Logger.getLogger( EdgeInputSplitsCallable.class); + + /** Aggregator handler */ + private final WorkerThreadAggregatorUsage aggregatorUsage; /** Edge input format */ private final EdgeInputFormat<I, E> edgeInputFormat; /** Input split max edges (-1 denotes all) */ @@ -95,6 +98,9 @@ public class EdgeInputSplitsCallable<I extends WritableComparable, this.edgeInputFormat = edgeInputFormat; inputSplitMaxEdges = configuration.getInputSplitMaxEdges(); + // Initialize aggregator usage. + this.aggregatorUsage = bspServiceWorker.getAggregatorHandler() + .newThreadAggregatorUsage(); edgeInputFilter = configuration.getEdgeInputFilter(); // Initialize Metrics @@ -125,7 +131,10 @@ public class EdgeInputSplitsCallable<I extends WritableComparable, edgeReader.setConf( (ImmutableClassesGiraphConfiguration<I, Writable, E>) configuration); + edgeReader.initialize(inputSplit, context); + // Set aggregator usage to edge reader + edgeReader.setWorkerAggregatorUse(aggregatorUsage); long inputSplitEdgesLoaded = 0; long inputSplitEdgesFiltered = 0; http://git-wip-us.apache.org/repos/asf/giraph/blob/1eaddd18/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java index 977e100..fb4fdf4 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java @@ -129,7 +129,14 @@ public class VertexInputSplitsCallable<I extends WritableComparable, VertexReader<I, V, E> vertexReader = vertexInputFormat.createVertexReader(inputSplit, context); vertexReader.setConf(configuration); + + WorkerThreadAggregatorUsage aggregatorUsage = + this.bspServiceWorker + .getAggregatorHandler().newThreadAggregatorUsage(); + vertexReader.initialize(inputSplit, context); + // Set aggregator usage to vertex reader + vertexReader.setWorkerAggregatorUse(aggregatorUsage); long inputSplitVerticesLoaded = 0; long inputSplitVerticesFiltered = 0; http://git-wip-us.apache.org/repos/asf/giraph/blob/1eaddd18/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestComputation.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestComputation.java index db527f2..b054e9e 100644 --- a/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestComputation.java +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestComputation.java @@ -19,14 +19,29 @@ package org.apache.giraph.examples; import org.apache.giraph.aggregators.LongSumAggregator; +import org.apache.giraph.bsp.BspInputSplit; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.edge.EdgeFactory; import org.apache.giraph.graph.BasicComputation; import org.apache.giraph.master.DefaultMasterCompute; import org.apache.giraph.graph.Vertex; +import org.apache.giraph.io.EdgeInputFormat; +import org.apache.giraph.io.EdgeReader; +import org.apache.giraph.io.VertexReader; +import org.apache.giraph.io.formats.GeneratedVertexInputFormat; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.log4j.Logger; + +import com.google.common.collect.Lists; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; /** Computation which uses aggrergators. To be used for testing. */ public class AggregatorsTestComputation extends @@ -37,6 +52,12 @@ public class AggregatorsTestComputation extends private static final String REGULAR_AGG = "regular"; /** Name of persistent aggregator */ private static final String PERSISTENT_AGG = "persistent"; + /** Name of input super step persistent aggregator */ + private static final String INPUT_VERTEX_PERSISTENT_AGG + = "input_super_step_vertex_agg"; + /** Name of input super step persistent aggregator */ + private static final String INPUT_EDGE_PERSISTENT_AGG + = "input_super_step_edge_agg"; /** Name of master overwriting aggregator */ private static final String MASTER_WRITE_AGG = "master"; /** Value which master compute will use */ @@ -92,6 +113,14 @@ public class AggregatorsTestComputation extends setAggregatedValue(MASTER_WRITE_AGG, myValue); long nv = getTotalNumVertices(); + if (superstep >= 0) { + assertEquals(100, ((LongWritable) + getAggregatedValue(INPUT_VERTEX_PERSISTENT_AGG)).get()); + } + if (superstep >= 0) { + assertEquals(4500, ((LongWritable) + getAggregatedValue(INPUT_EDGE_PERSISTENT_AGG)).get()); + } if (superstep > 0) { assertEquals(nv * (1L << (superstep - 1)), ((LongWritable) getAggregatedValue(REGULAR_AGG)).get()); @@ -111,6 +140,10 @@ public class AggregatorsTestComputation extends @Override public void initialize() throws InstantiationException, IllegalAccessException { + registerPersistentAggregator( + INPUT_VERTEX_PERSISTENT_AGG, LongSumAggregator.class); + registerPersistentAggregator( + INPUT_EDGE_PERSISTENT_AGG, LongSumAggregator.class); registerAggregator(REGULAR_AGG, LongSumAggregator.class); registerPersistentAggregator(PERSISTENT_AGG, LongSumAggregator.class); @@ -134,4 +167,125 @@ public class AggregatorsTestComputation extends ", actual: " + actual); } } + + /** + * Simple VertexReader + */ + public static class SimpleVertexReader extends + GeneratedVertexReader<LongWritable, DoubleWritable, FloatWritable> { + /** Class logger */ + private static final Logger LOG = + Logger.getLogger(SimpleVertexReader.class); + + @Override + public boolean nextVertex() { + return totalRecords > recordsRead; + } + + @Override + public Vertex<LongWritable, DoubleWritable, + FloatWritable> getCurrentVertex() throws IOException { + Vertex<LongWritable, DoubleWritable, FloatWritable> vertex = + getConf().createVertex(); + LongWritable vertexId = new LongWritable( + (inputSplit.getSplitIndex() * totalRecords) + recordsRead); + DoubleWritable vertexValue = new DoubleWritable(vertexId.get() * 10d); + long targetVertexId = + (vertexId.get() + 1) % + (inputSplit.getNumSplits() * totalRecords); + float edgeValue = vertexId.get() * 100f; + List<Edge<LongWritable, FloatWritable>> edges = Lists.newLinkedList(); + edges.add(EdgeFactory.create(new LongWritable(targetVertexId), + new FloatWritable(edgeValue))); + vertex.initialize(vertexId, vertexValue, edges); + ++recordsRead; + if (LOG.isInfoEnabled()) { + LOG.info("next vertex: Return vertexId=" + vertex.getId().get() + + ", vertexValue=" + vertex.getValue() + + ", targetVertexId=" + targetVertexId + ", edgeValue=" + edgeValue); + } + aggregate(INPUT_VERTEX_PERSISTENT_AGG, + new LongWritable((long) vertex.getValue().get())); + return vertex; + } + } + + /** + * Simple VertexInputFormat + */ + public static class SimpleVertexInputFormat extends + GeneratedVertexInputFormat<LongWritable, DoubleWritable, FloatWritable> { + @Override + public VertexReader<LongWritable, DoubleWritable, + FloatWritable> createVertexReader(InputSplit split, + TaskAttemptContext context) + throws IOException { + return new SimpleVertexReader(); + } + } + + /** + * Simple Edge Reader + */ + public static class SimpleEdgeReader extends + GeneratedEdgeReader<LongWritable, FloatWritable> { + /** Class logger */ + private static final Logger LOG = Logger.getLogger(SimpleEdgeReader.class); + + @Override + public boolean nextEdge() { + return totalRecords > recordsRead; + } + + @Override + public Edge<LongWritable, FloatWritable> getCurrentEdge() + throws IOException { + LongWritable vertexId = new LongWritable( + (inputSplit.getSplitIndex() * totalRecords) + recordsRead); + long targetVertexId = (vertexId.get() + 1) % + (inputSplit.getNumSplits() * totalRecords); + float edgeValue = vertexId.get() * 100f; + Edge<LongWritable, FloatWritable> edge = EdgeFactory.create( + new LongWritable(targetVertexId), new FloatWritable(edgeValue)); + ++recordsRead; + if (LOG.isInfoEnabled()) { + LOG.info("next edge: Return targetVertexId=" + targetVertexId + + ", edgeValue=" + edgeValue); + } + aggregate(INPUT_EDGE_PERSISTENT_AGG, new LongWritable((long) edge + .getValue().get())); + return edge; + } + + @Override + public LongWritable getCurrentSourceId() throws IOException, + InterruptedException { + LongWritable vertexId = new LongWritable( + (inputSplit.getSplitIndex() * totalRecords) + recordsRead); + return vertexId; + } + } + + /** + * Simple VertexInputFormat + */ + public static class SimpleEdgeInputFormat extends + EdgeInputFormat<LongWritable, FloatWritable> { + + @Override + public EdgeReader<LongWritable, FloatWritable> createEdgeReader( + InputSplit split, TaskAttemptContext context) throws IOException { + return new SimpleEdgeReader(); + } + + @Override + public List<InputSplit> getSplits(JobContext context, int minSplitCountHint) + throws IOException, InterruptedException { + List<InputSplit> inputSplitList = new ArrayList<InputSplit>(); + for (int i = 0; i < minSplitCountHint; ++i) { + inputSplitList.add(new BspInputSplit(i, minSplitCountHint)); + } + return inputSplitList; + } + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/1eaddd18/giraph-examples/src/main/java/org/apache/giraph/examples/GeneratedEdgeReader.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/GeneratedEdgeReader.java b/giraph-examples/src/main/java/org/apache/giraph/examples/GeneratedEdgeReader.java new file mode 100644 index 0000000..14da5f2 --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/GeneratedEdgeReader.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.examples; + +import java.io.IOException; +import org.apache.giraph.bsp.BspInputSplit; +import org.apache.giraph.conf.LongConfOption; +import org.apache.giraph.io.EdgeReader; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * Used by GeneratedEdgeInputFormat + * to read some generated data + * + * @param <I> Vertex index value + * @param <E> Edge value + */ +@SuppressWarnings("rawtypes") +public abstract class GeneratedEdgeReader< + I extends WritableComparable, + E extends Writable> + extends EdgeReader<I, E> { + /** Default edges produced by this reader */ + public static final LongConfOption DEFAULT_READER_EDGES = + new LongConfOption("GeneratedEdgeReader.reader_edges", 10); + /** Records read so far */ + protected long recordsRead = 0; + /** Total records to read (on this split alone) */ + protected long totalRecords = 0; + /** The input split from initialize(). */ + protected BspInputSplit inputSplit = null; + + /** + * Default constructor for reflection. + */ + public GeneratedEdgeReader() { + } + + @Override + public final void initialize(InputSplit inputSplit, + TaskAttemptContext context) throws IOException { + totalRecords = DEFAULT_READER_EDGES.get(getConf()); + this.inputSplit = (BspInputSplit) inputSplit; + } + + @Override + public void close() throws IOException { + } + + @Override + public final float getProgress() throws IOException { + return recordsRead * 100.0f / totalRecords; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/1eaddd18/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java b/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java index 6d22800..e2b611b 100644 --- a/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java +++ b/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java @@ -25,7 +25,6 @@ import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.examples.AggregatorsTestComputation; import org.apache.giraph.examples.SimpleCheckpoint; -import org.apache.giraph.examples.SimplePageRankComputation; import org.apache.giraph.job.GiraphJob; import org.apache.giraph.master.MasterAggregatorHandler; import org.apache.hadoop.fs.Path; @@ -77,7 +76,9 @@ public class TestAggregatorsHandling extends BspCase { GiraphConfiguration conf = new GiraphConfiguration(); conf.setComputationClass(AggregatorsTestComputation.class); conf.setVertexInputFormatClass( - SimplePageRankComputation.SimplePageRankVertexInputFormat.class); + AggregatorsTestComputation.SimpleVertexInputFormat.class); + conf.setEdgeInputFormatClass( + AggregatorsTestComputation.SimpleEdgeInputFormat.class); GiraphJob job = prepareJob(getCallingMethodName(), conf); job.getConfiguration().setMasterComputeClass( AggregatorsTestComputation.AggregatorsTestMasterCompute.class); @@ -159,7 +160,9 @@ public class TestAggregatorsHandling extends BspCase { conf.setMasterComputeClass( AggregatorsTestComputation.AggregatorsTestMasterCompute.class); conf.setVertexInputFormatClass( - SimplePageRankComputation.SimplePageRankVertexInputFormat.class); + AggregatorsTestComputation.SimpleVertexInputFormat.class); + conf.setEdgeInputFormatClass( + AggregatorsTestComputation.SimpleEdgeInputFormat.class); GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath); GiraphConfiguration configuration = job.getConfiguration(); @@ -178,7 +181,9 @@ public class TestAggregatorsHandling extends BspCase { conf.setMasterComputeClass( AggregatorsTestComputation.AggregatorsTestMasterCompute.class); conf.setVertexInputFormatClass( - SimplePageRankComputation.SimplePageRankVertexInputFormat.class); + AggregatorsTestComputation.SimpleVertexInputFormat.class); + conf.setEdgeInputFormatClass( + AggregatorsTestComputation.SimpleEdgeInputFormat.class); GiraphJob restartedJob = prepareJob(getCallingMethodName() + "Restarted", conf, outputPath); job.getConfiguration().setMasterComputeClass(
