Updated Branches: refs/heads/trunk 56fcb519a -> 2430ec5f2
GIRAPH-579 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/2430ec5f Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/2430ec5f Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/2430ec5f Branch: refs/heads/trunk Commit: 2430ec5f231f572b780dc0cb0053f409358df155 Parents: 56fcb51 Author: Alessandro Presta <[email protected]> Authored: Thu Mar 21 11:43:07 2013 -0700 Committer: Alessandro Presta <[email protected]> Committed: Tue Mar 26 14:52:15 2013 -0700 ---------------------------------------------------------------------- CHANGELOG | 3 + .../java/org/apache/giraph/conf/GiraphClasses.java | 32 ++++- .../apache/giraph/conf/GiraphConfiguration.java | 12 ++ .../org/apache/giraph/conf/GiraphConstants.java | 2 + .../conf/ImmutableClassesGiraphConfiguration.java | 40 +++++ .../java/org/apache/giraph/edge/EdgeStore.java | 116 +++++++++++---- .../giraph/job/GiraphConfigurationValidator.java | 21 ++- .../apache/giraph/utils/ConfigurationUtils.java | 5 + .../apache/giraph/utils/InternalVertexRunner.java | 1 + .../java/org/apache/giraph/io/TestEdgeInput.java | 62 +++++++- 10 files changed, 256 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/2430ec5f/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index b695ae0..674d15e 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,9 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-579: Make it possible to use different out-edges data structures + for input and computation (apresta) + GIRAPH-582: Create a generic option for determining the number of supersteps that a job runs for (aching) http://git-wip-us.apache.org/repos/asf/giraph/blob/2430ec5f/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java index d67d3a5..c13f3a2 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java @@ -71,6 +71,8 @@ public class GiraphClasses<I extends WritableComparable, protected Class<M> messageValueClass; /** Vertex edges class - cached for fast access */ protected Class<? extends VertexEdges<I, E>> vertexEdgesClass; + /** Input vertex edges class - cached for fast access */ + protected Class<? extends VertexEdges<I, E>> inputVertexEdgesClass; /** Vertex value factory class - cached for fast access */ protected Class<? extends VertexValueFactory<V>> vertexValueFactoryClass; @@ -114,6 +116,8 @@ public class GiraphClasses<I extends WritableComparable, // downcast. vertexEdgesClass = (Class<? extends VertexEdges<I, E>>) (Object) ByteArrayEdges.class; + inputVertexEdgesClass = (Class<? extends VertexEdges<I, E>>) (Object) + ByteArrayEdges.class; vertexValueFactoryClass = (Class<? extends VertexValueFactory<V>>) (Object) DefaultVertexValueFactory.class; graphPartitionerFactoryClass = @@ -156,9 +160,12 @@ public class GiraphClasses<I extends WritableComparable, vertexEdgesClass = (Class<? extends VertexEdges<I, E>>) conf.getClass(VERTEX_EDGES_CLASS, ByteArrayEdges.class, VertexEdges.class); + inputVertexEdgesClass = (Class<? extends VertexEdges<I, E>>) + conf.getClass(INPUT_VERTEX_EDGES_CLASS, vertexEdgesClass, + VertexEdges.class); vertexValueFactoryClass = (Class<? extends VertexValueFactory<V>>) conf.getClass(VERTEX_VALUE_FACTORY_CLASS, - DefaultVertexValueFactory.class); + DefaultVertexValueFactory.class, VertexValueFactory.class); graphPartitionerFactoryClass = (Class<? extends GraphPartitionerFactory<I, V, E, M>>) @@ -248,6 +255,15 @@ public class GiraphClasses<I extends WritableComparable, return vertexEdgesClass; } + /* Get Vertex edges class used during edge-based input + * + * @return Vertex edges class. + */ + public Class<? extends VertexEdges<I, E>> getInputVertexEdgesClass() { + return inputVertexEdgesClass; + } + + /** * Get vertex value factory class * @@ -519,6 +535,20 @@ public class GiraphClasses<I extends WritableComparable, } /** + * Set VertexEdges class used during edge-input (if different from the one + * used for computation) + * + * @param inputVertexEdgesClass Input vertex edges class to set + * @return this + */ + public GiraphClasses setInputVertexEdgesClass( + Class<? extends VertexEdges> inputVertexEdgesClass) { + this.inputVertexEdgesClass = + (Class<? extends VertexEdges<I, E>>) inputVertexEdgesClass; + return this; + } + + /** * Set VertexValueFactory class held * * @param vertexValueFactoryClass Vertex value factory class to set http://git-wip-us.apache.org/repos/asf/giraph/blob/2430ec5f/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java index 3b84831..ffcae6e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java @@ -97,6 +97,18 @@ public class GiraphConfiguration extends Configuration } /** + * Set the vertex edges class used during edge-based input (if different + * from the one used during computation) + * + * @param inputVertexEdgesClass Determines the way edges are stored + */ + public final void setInputVertexEdgesClass( + Class<? extends VertexEdges> inputVertexEdgesClass) { + setClass(INPUT_VERTEX_EDGES_CLASS, inputVertexEdgesClass, + VertexEdges.class); + } + + /** * Set the vertex input format class (required) * * @param vertexInputFormatClass Determines how graph is input http://git-wip-us.apache.org/repos/asf/giraph/blob/2430ec5f/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index 42f8abc..7882d06 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -28,6 +28,8 @@ public interface GiraphConstants { String VERTEX_VALUE_FACTORY_CLASS = "giraph.vertexValueFactoryClass"; /** Vertex edges class - optional */ String VERTEX_EDGES_CLASS = "giraph.vertexEdgesClass"; + /** Vertex edges class to be used during edge input only - optional */ + String INPUT_VERTEX_EDGES_CLASS = "giraph.inputVertexEdgesClass"; /** Class for Master - optional */ String MASTER_COMPUTE_CLASS = "giraph.masterComputeClass"; http://git-wip-us.apache.org/repos/asf/giraph/blob/2430ec5f/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java index 2a3466d..de85ab6 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java @@ -575,6 +575,25 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable, } /** + * Get the user's subclassed {@link VertexEdges} used for input + * + * @return User's input vertex edges class + */ + public Class<? extends VertexEdges<I, E>> getInputVertexEdgesClass() { + return classes.getInputVertexEdgesClass(); + } + + /** + * Check whether the user has specified a different {@link VertexEdges} + * class to be used during edge-based input. + * + * @return True iff there is a special edges class for input + */ + public boolean useInputVertexEdges() { + return classes.getInputVertexEdgesClass() != classes.getVertexEdgesClass(); + } + + /** * True if the {@link VertexEdges} implementation copies the passed edges * to its own data structure, i.e. it doesn't keep references to Edge * objects, target vertex ids or edge values passed to add() or @@ -639,6 +658,27 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable, } /** + * Create a user {@link VertexEdges} used during edge-based input + * + * @return Instantiated user input VertexEdges + */ + public VertexEdges<I, E> createInputVertexEdges() { + return ReflectionUtils.newInstance(getInputVertexEdgesClass(), this); + } + + /** + * Create an input {@link VertexEdges} instance and initialize it with the + * default capacity. + * + * @return Instantiated input VertexEdges + */ + public VertexEdges<I, E> createAndInitializeInputVertexEdges() { + VertexEdges<I, E> vertexEdges = createInputVertexEdges(); + vertexEdges.initialize(); + return vertexEdges; + } + + /** * Create a partition * * @param id Partition id http://git-wip-us.apache.org/repos/asf/giraph/blob/2430ec5f/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java index 234c267..3101211 100644 --- a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java @@ -19,18 +19,24 @@ package org.apache.giraph.edge; import com.google.common.collect.MapMaker; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.graph.Vertex; import org.apache.giraph.partition.Partition; import org.apache.giraph.utils.ByteArrayVertexIdEdges; -import org.apache.giraph.graph.Vertex; +import org.apache.giraph.utils.ProgressableUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.util.Progressable; import org.apache.log4j.Logger; -import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * Collects incoming edges for vertices owned by this worker. @@ -58,6 +64,11 @@ public class EdgeStore<I extends WritableComparable, * reuse. */ private boolean reuseEdgeObjects; + /** + * Whether the {@link VertexEdges} class used during input is different + * from the one used during computation. + */ + private boolean useInputVertexEdges; /** * Constructor. @@ -76,6 +87,7 @@ public class EdgeStore<I extends WritableComparable, transientEdges = new MapMaker().concurrencyLevel( configuration.getNettyServerExecutionConcurrency()).makeMap(); reuseEdgeObjects = configuration.reuseEdgeObjects(); + useInputVertexEdges = configuration.useInputVertexEdges(); } /** @@ -110,7 +122,7 @@ public class EdgeStore<I extends WritableComparable, VertexEdges<I, E> vertexEdges = partitionEdges.get(vertexId); if (vertexEdges == null) { VertexEdges<I, E> newVertexEdges = - configuration.createAndInitializeVertexEdges(); + configuration.createAndInitializeInputVertexEdges(); vertexEdges = partitionEdges.putIfAbsent(vertexId, newVertexEdges); if (vertexEdges == null) { vertexEdges = newVertexEdges; @@ -126,6 +138,27 @@ public class EdgeStore<I extends WritableComparable, } /** + * Convert the input edges to the {@link VertexEdges} data structure used + * for computation (if different). + * + * @param inputEdges Input edges + * @return Compute edges + */ + private VertexEdges<I, E> convertInputToComputeEdges( + VertexEdges<I, E> inputEdges) { + if (!useInputVertexEdges) { + return inputEdges; + } else { + VertexEdges<I, E> computeEdges = + configuration.createAndInitializeVertexEdges(inputEdges.size()); + for (Edge<I, E> edge : inputEdges) { + computeEdges.add(edge); + } + return computeEdges; + } + } + + /** * Move all edges from temporary storage to their source vertices. * Note: this method is not thread-safe. */ @@ -133,37 +166,62 @@ public class EdgeStore<I extends WritableComparable, if (LOG.isInfoEnabled()) { LOG.info("moveEdgesToVertices: Moving incoming edges to vertices."); } - for (Map.Entry<Integer, ConcurrentMap<I, - VertexEdges<I, E>>> partitionEdges : transientEdges.entrySet()) { - Partition<I, V, E, M> partition = - service.getPartitionStore().getPartition(partitionEdges.getKey()); - for (I vertexId : partitionEdges.getValue().keySet()) { - VertexEdges<I, E> vertexEdges = - partitionEdges.getValue().remove(vertexId); - Vertex<I, V, E, M> vertex = partition.getVertex(vertexId); - // If the source vertex doesn't exist, create it. Otherwise, - // just set the edges. - if (vertex == null) { - vertex = configuration.createVertex(); - vertex.initialize(vertexId, configuration.createVertexValue(), - vertexEdges); - partition.putVertex(vertex); - } else { - vertex.setEdges(vertexEdges); - // Some Partition implementations (e.g. ByteArrayPartition) require - // us to put back the vertex after modifying it. - partition.saveVertex(vertex); + + final BlockingQueue<Integer> partitionIdQueue = + new ArrayBlockingQueue<Integer>(transientEdges.size()); + partitionIdQueue.addAll(transientEdges.keySet()); + int numThreads = configuration.getNumInputSplitsThreads(); + ExecutorService movePartitionExecutor = + Executors.newFixedThreadPool(numThreads, + new ThreadFactoryBuilder().setNameFormat("move-edges-%d").build()); + + for (int i = 0; i < numThreads; ++i) { + Callable moveCallable = new Callable<Void>() { + @Override + public Void call() throws Exception { + Integer partitionId; + while ((partitionId = partitionIdQueue.poll()) != null) { + Partition<I, V, E, M> partition = + service.getPartitionStore().getPartition(partitionId); + ConcurrentMap<I, VertexEdges<I, E>> partitionEdges = + transientEdges.remove(partitionId); + for (I vertexId : partitionEdges.keySet()) { + VertexEdges<I, E> vertexEdges = convertInputToComputeEdges( + partitionEdges.remove(vertexId)); + Vertex<I, V, E, M> vertex = partition.getVertex(vertexId); + // If the source vertex doesn't exist, create it. Otherwise, + // just set the edges. + if (vertex == null) { + vertex = configuration.createVertex(); + vertex.initialize(vertexId, configuration.createVertexValue(), + vertexEdges); + partition.putVertex(vertex); + } else { + vertex.setEdges(vertexEdges); + // Some Partition implementations (e.g. ByteArrayPartition) + // require us to put back the vertex after modifying it. + partition.saveVertex(vertex); + } + } + // Some PartitionStore implementations + // (e.g. DiskBackedPartitionStore) require us to put back the + // partition after modifying it. + service.getPartitionStore().putPartition(partition); + } + return null; } - progressable.progress(); - } - // Some PartitionStore implementations (e.g. DiskBackedPartitionStore) - // require us to put back the partition after modifying it. - service.getPartitionStore().putPartition(partition); + }; + movePartitionExecutor.submit(moveCallable); } + + movePartitionExecutor.shutdown(); + ProgressableUtils.awaitExecutorTermination(movePartitionExecutor, + progressable); + transientEdges.clear(); + if (LOG.isInfoEnabled()) { LOG.info("moveEdgesToVertices: Finished moving incoming edges to " + "vertices."); } - transientEdges.clear(); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/2430ec5f/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java index eace06f..1e05773 100644 --- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java +++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java @@ -170,10 +170,13 @@ public class GiraphConfigurationValidator<I extends WritableComparable, } } - /** Verify matching generic types in VertexEdges. */ - private void verifyVertexEdgesGenericTypes() { - Class<? extends VertexEdges<I, E>> vertexEdgesClass = - conf.getVertexEdgesClass(); + /** + * Verify matching generic types for a specific VertexEdges class. + * + * @param vertexEdgesClass {@link VertexEdges} class to check + */ + private void verifyVertexEdgesGenericTypesClass( + Class<? extends VertexEdges<I, E>> vertexEdgesClass) { List<Class<?>> classList = ReflectionUtils.getTypeArguments( VertexEdges.class, vertexEdgesClass); // VertexEdges implementations can be generic, in which case there are no @@ -198,6 +201,16 @@ public class GiraphConfigurationValidator<I extends WritableComparable, } } + /** Verify matching generic types in VertexEdges. */ + private void verifyVertexEdgesGenericTypes() { + Class<? extends VertexEdges<I, E>> vertexEdgesClass = + conf.getVertexEdgesClass(); + Class<? extends VertexEdges<I, E>> inputVertexEdgesClass = + conf.getInputVertexEdgesClass(); + verifyVertexEdgesGenericTypesClass(vertexEdgesClass); + verifyVertexEdgesGenericTypesClass(inputVertexEdgesClass); + } + /** Verify matching generic types in VertexInputFormat. */ private void verifyVertexInputFormatGenericTypes() { Class<? extends VertexInputFormat<I, V, E, M>> vertexInputFormatClass = http://git-wip-us.apache.org/repos/asf/giraph/blob/2430ec5f/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java index cb2a2f7..bd30455 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java @@ -203,6 +203,11 @@ public final class ConfigurationUtils { (Class<? extends VertexEdges>) Class.forName(cmd.getOptionValue("ve"))); } + if (cmd.hasOption("ive")) { + giraphConfiguration.setInputVertexEdgesClass( + (Class<? extends VertexEdges>) + Class.forName(cmd.getOptionValue("ive"))); + } if (cmd.hasOption("wc")) { giraphConfiguration.setWorkerContextClass( (Class<? extends WorkerContext>) http://git-wip-us.apache.org/repos/asf/giraph/blob/2430ec5f/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java index abf6950..2bba672 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java @@ -126,6 +126,7 @@ public class InternalVertexRunner { GiraphConfiguration conf = job.getConfiguration(); conf.setVertexClass(classes.getVertexClass()); conf.setVertexEdgesClass(classes.getVertexEdgesClass()); + conf.setInputVertexEdgesClass(classes.getInputVertexEdgesClass()); conf.setVertexValueFactoryClass(classes.getVertexValueFactoryClass()); if (classes.hasVertexInputFormat()) { conf.setVertexInputFormatClass(classes.getVertexInputFormatClass()); http://git-wip-us.apache.org/repos/asf/giraph/blob/2430ec5f/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java b/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java index bfc7e8b..0dcefd9 100644 --- a/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java +++ b/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java @@ -23,14 +23,15 @@ import com.google.common.collect.Maps; import org.apache.giraph.BspCase; import org.apache.giraph.conf.GiraphClasses; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.edge.ByteArrayEdges; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.graph.Vertex; import org.apache.giraph.graph.VertexValueFactory; import org.apache.giraph.io.formats.IdWithValueTextOutputFormat; import org.apache.giraph.io.formats.IntIntTextVertexValueInputFormat; import org.apache.giraph.io.formats.IntNullReverseTextEdgeInputFormat; import org.apache.giraph.io.formats.IntNullTextEdgeInputFormat; import org.apache.giraph.utils.InternalVertexRunner; -import org.apache.giraph.edge.ByteArrayEdges; -import org.apache.giraph.graph.Vertex; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.junit.Test; @@ -39,6 +40,8 @@ import java.io.IOException; import java.util.Map; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; /** * A test case to ensure that loading a graph from a list of edges works as @@ -183,6 +186,38 @@ public class TestEdgeInput extends BspCase { assertEquals(1, (int) values.get(5)); } + // It should use the specified input VertexEdges class. + @Test + public void testDifferentInputEdgesClass() throws Exception { + String[] edges = new String[] { + "1 2", + "2 3", + "2 4", + "4 1" + }; + + GiraphClasses classes = new GiraphClasses(); + classes.setVertexClass(TestVertexCheckEdgesType.class); + classes.setVertexEdgesClass(ByteArrayEdges.class); + classes.setInputVertexEdgesClass(TestVertexEdgesFilterEven.class); + classes.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class); + classes.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class); + Map<String, String> params = ImmutableMap.of(); + Iterable<String> results = InternalVertexRunner.run(classes, params, + null, edges); + + Map<Integer, Integer> values = parseResults(results); + + // Check that all vertices with outgoing edges in the input have been + // created + assertEquals(3, values.size()); + // Check the number of edges for each vertex (edges with odd target id + // should have been removed) + assertEquals(1, (int) values.get(1)); + assertEquals(1, (int) values.get(2)); + assertEquals(0, (int) values.get(4)); + } + public static class TestVertexWithNumEdges extends Vertex<IntWritable, IntWritable, NullWritable, NullWritable> { @Override @@ -192,8 +227,17 @@ public class TestEdgeInput extends BspCase { } } + public static class TestVertexCheckEdgesType extends TestVertexWithNumEdges { + @Override + public void compute(Iterable<NullWritable> messages) throws IOException { + assertFalse(getEdges() instanceof TestVertexEdgesFilterEven); + assertTrue(getEdges() instanceof ByteArrayEdges); + super.compute(messages); + } + } + public static class TestVertexDoNothing extends Vertex<IntWritable, - IntWritable, NullWritable, NullWritable> { + IntWritable, NullWritable, NullWritable> { @Override public void compute(Iterable<NullWritable> messages) throws IOException { voteToHalt(); @@ -204,7 +248,7 @@ public class TestEdgeInput extends BspCase { implements VertexValueFactory<IntWritable> { @Override public void initialize(ImmutableClassesGiraphConfiguration<?, IntWritable, - ?, ?> configuration) { } + ?, ?> configuration) { } @Override public IntWritable createVertexValue() { @@ -212,6 +256,16 @@ public class TestEdgeInput extends BspCase { } } + public static class TestVertexEdgesFilterEven + extends ByteArrayEdges<IntWritable, NullWritable> { + @Override + public void add(Edge<IntWritable, NullWritable> edge) { + if (edge.getTargetVertexId().get() % 2 == 0) { + super.add(edge); + } + } + } + private static Map<Integer, Integer> parseResults(Iterable<String> results) { Map<Integer, Integer> values = Maps.newHashMap(); for (String line : results) {
