Repository: flink Updated Branches: refs/heads/master 3b69b2499 -> 9350264bd
[FLINK-2141][gelly] Allow GSA's Gather to perform this operation in IN, OUT or ALL directions [FLINK-2141][gelly] Added Test cases for GSAConfiguration setDirection. Made changes in coding style [FLINK-2141][gelly]Removed Example [FLINK-2141][gelly] Corrected Annotation and Gelly guide Minor Changes in Gelly Guide This closes #877 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9350264b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9350264b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9350264b Branch: refs/heads/master Commit: 9350264bdda2b4a5e230c4621a612e38a0f77be1 Parents: 3b69b24 Author: Shivani <[email protected]> Authored: Wed Jul 1 03:03:58 2015 +0200 Committer: andra <[email protected]> Committed: Sun Jul 12 11:18:27 2015 +0200 ---------------------------------------------------------------------- docs/libs/gelly_guide.md | 20 +++ .../flink/graph/gsa/GSAConfiguration.java | 23 +++ .../graph/gsa/GatherSumApplyIteration.java | 47 +++++- .../test/GatherSumApplyConfigurationITCase.java | 157 +++++++++++++++++++ 4 files changed, 244 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9350264b/docs/libs/gelly_guide.md ---------------------------------------------------------------------- diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md index 1e9bf48..e0e05b3 100644 --- a/docs/libs/gelly_guide.md +++ b/docs/libs/gelly_guide.md @@ -693,6 +693,9 @@ Currently, the following parameters can be specified: * <strong>Number of Vertices</strong>: Accessing the total number of vertices within the iteration. This property can be set using the `setOptNumVertices()` method. The number of vertices can then be accessed in the gather, sum and/or apply functions by using the `getNumberOfVertices()` method. If the option is not set in the configuration, this method will return -1. +* <strong>Neighbor Direction</strong>: By default values are gathered from the out neighbors of the Vertex. This can be modified +using the `setDirection()` method. + The following example illustrates the usage of the number of vertices option. {% highlight java %} @@ -734,6 +737,23 @@ public static final class Apply { {% endhighlight %} +The following example illustrates the usage of the edge direction option. +{% highlight java %} + +Graph<Long, HashSet<Long>, Double> graph = ... + +// configure the iteration +GSAConfiguration parameters = new GSAConfiguration(); + +// set the messaging direction +parameters.setDirection(EdgeDirection.IN); + +// run the gather-sum-apply iteration, also passing the configuration parameters +DataSet<Vertex<Long, HashSet<Long>>> result = + graph.runGatherSumApplyIteration( + new Gather(), new Sum(), new Apply(), maxIterations, parameters) + .getVertices(); +{% endhighlight %} [Back to top](#top) ### Vertex-centric and GSA Comparison http://git-wip-us.apache.org/repos/asf/flink/blob/9350264b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java index de47280..8d24f16 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java @@ -20,6 +20,7 @@ package org.apache.flink.graph.gsa; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.EdgeDirection; import org.apache.flink.graph.IterationConfiguration; import java.util.ArrayList; @@ -46,6 +47,8 @@ public class GSAConfiguration extends IterationConfiguration { /** the broadcast variables for the apply function **/ private List<Tuple2<String, DataSet<?>>> bcVarsApply = new ArrayList<Tuple2<String,DataSet<?>>>(); + private EdgeDirection direction = EdgeDirection.OUT; + public GSAConfiguration() {} /** @@ -107,4 +110,24 @@ public class GSAConfiguration extends IterationConfiguration { public List<Tuple2<String, DataSet<?>>> getApplyBcastVars() { return this.bcVarsApply; } + + /** + * Gets the direction from which the neighbors are to be selected + * By default the neighbors who are target of the edges are selected + * + * @return an EdgeDirection, which can be either IN, OUT or ALL. + */ + public EdgeDirection getDirection() { + return direction; + } + + /** + * Sets the direction in which neighbors are to be selected + * By default the neighbors who are target of the edges are selected + * + * @param direction - IN, OUT or ALL + */ + public void setDirection(EdgeDirection direction) { + this.direction = direction; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/9350264b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java index 389cf02..4c91089 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java @@ -39,6 +39,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.configuration.Configuration; import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.util.Collector; @@ -64,6 +65,7 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati private final SumFunction<VV, EV, M> sum; private final ApplyFunction<K, VV, M> apply; private final int maximumNumberOfIterations; + private EdgeDirection direction = EdgeDirection.OUT; private GSAConfiguration configuration; @@ -163,9 +165,34 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati } // Prepare the neighbors - DataSet<Tuple2<K, Neighbor<VV, EV>>> neighbors = iteration + if(this.configuration != null) { + direction = this.configuration.getDirection(); + } + DataSet<Tuple2<K, Neighbor<VV, EV>>> neighbors; + switch(direction) { + case OUT: + neighbors = iteration + .getWorkset().join(edgeDataSet) + .where(0).equalTo(0).with(new ProjectKeyWithNeighborOUT<K, VV, EV>()); + break; + case IN: + neighbors = iteration .getWorkset().join(edgeDataSet) - .where(0).equalTo(0).with(new ProjectKeyWithNeighbor<K, VV, EV>()); + .where(0).equalTo(1).with(new ProjectKeyWithNeighborIN<K, VV, EV>()); + break; + case ALL: + neighbors = iteration + .getWorkset().join(edgeDataSet) + .where(0).equalTo(0).with(new ProjectKeyWithNeighborOUT<K, VV, EV>()).union(iteration + .getWorkset().join(edgeDataSet) + .where(0).equalTo(1).with(new ProjectKeyWithNeighborIN<K, VV, EV>())); + break; + default: + neighbors = iteration + .getWorkset().join(edgeDataSet) + .where(0).equalTo(0).with(new ProjectKeyWithNeighborOUT<K, VV, EV>()); + break; + } // Gather, sum and apply MapOperator<Tuple2<K, Neighbor<VV, EV>>, Tuple2<K, M>> gatherMapOperator = neighbors.map(gatherUdf); @@ -358,7 +385,7 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati @SuppressWarnings("serial") @ForwardedFieldsSecond("f1->f0") - private static final class ProjectKeyWithNeighbor<K, VV, EV> implements FlatJoinFunction< + private static final class ProjectKeyWithNeighborOUT<K, VV, EV> implements FlatJoinFunction< Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Neighbor<VV, EV>>> { public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, Collector<Tuple2<K, Neighbor<VV, EV>>> out) { @@ -367,6 +394,20 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati } } + @SuppressWarnings("serial") + @ForwardedFieldsSecond({"f0"}) + private static final class ProjectKeyWithNeighborIN<K, VV, EV> implements FlatJoinFunction< + Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Neighbor<VV, EV>>> { + + public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, Collector<Tuple2<K, Neighbor<VV, EV>>> out) { + out.collect(new Tuple2<K, Neighbor<VV, EV>>( + edge.getSource(), new Neighbor<VV, EV>(vertex.getValue(), edge.getValue()))); + } + } + + + + /** * Configures this gather-sum-apply iteration with the provided parameters. * http://git-wip-us.apache.org/repos/asf/flink/blob/9350264b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java index 701eda9..53455c5 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java @@ -23,6 +23,8 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.gsa.ApplyFunction; @@ -38,6 +40,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.util.HashSet; import java.util.List; @RunWith(Parameterized.class) @@ -118,6 +121,117 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase compareResultAsTuples(result, expectedResult); } + @Test + public void testIterationDefaultDirection() throws Exception { + + /* + * Test that if no direction parameter is given, the iteration works as before + * (i.e. it gathers information from the IN edges and neighbors and the information is calculated for an OUT edge + * Default direction parameter is OUT for the GatherSumApplyIterations) + * When data is gathered from the IN edges the Gather Sum and Apply functions + * set the set of vertices which have path to a vertex as the value of that vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + List<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdges(); + + edges.remove(0); + + Graph<Long, HashSet<Long>, Long> graph = Graph + .fromCollection(TestGraphUtils.getLongLongVertices(), edges, env) + .mapVertices(new GatherSumApplyConfigurationITCase.InitialiseHashSetMapper()); + + DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph.runGatherSumApplyIteration( + new GetReachableVertices(), new FindAllReachableVertices(), new UpdateReachableVertices(), 4) + .getVertices(); + + List<Vertex<Long, HashSet<Long>>> result = resultedVertices.collect(); + + expectedResult = "1,[1, 2, 3, 4, 5]\n" + +"2,[2]\n" + +"3,[1, 2, 3, 4, 5]\n" + +"4,[1, 2, 3, 4, 5]\n" + +"5,[1, 2, 3, 4, 5]\n"; + + compareResultAsTuples(result, expectedResult); + } + + @Test + public void testIterationDirectionIN() throws Exception { + + /* + * Test that if the direction parameter IN is given, the iteration works as expected + * (i.e. it gathers information from the OUT edges and neighbors and the information is calculated for an IN edge + * When data is gathered from the OUT edges the Gather Sum and Apply functions + * set the set of vertices which have path from a vertex as the value of that vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + GSAConfiguration parameters = new GSAConfiguration(); + + parameters.setDirection(EdgeDirection.IN); + + List<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdges(); + + edges.remove(0); + + Graph<Long, HashSet<Long>, Long> graph = Graph + .fromCollection(TestGraphUtils.getLongLongVertices(), edges, env) + .mapVertices(new GatherSumApplyConfigurationITCase.InitialiseHashSetMapper()); + + DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph.runGatherSumApplyIteration( + new GetReachableVertices(), new FindAllReachableVertices(), new UpdateReachableVertices(), 4, + parameters) + .getVertices(); + List<Vertex<Long, HashSet<Long>>> result = resultedVertices.collect(); + + expectedResult = "1,[1, 3, 4, 5]\n" + +"2,[1, 2, 3, 4, 5]\n" + +"3,[1, 3, 4, 5]\n" + +"4,[1, 3, 4, 5]\n" + +"5,[1, 3, 4, 5]\n"; + + compareResultAsTuples(result, expectedResult); + } + + @Test + public void testIterationDirectionALL() throws Exception { + + /* + * Test that if the direction parameter OUT is given, the iteration works as expected + * (i.e. it gathers information from both IN and OUT edges and neighbors + * When data is gathered from the ALL edges the Gather Sum and Apply functions + * set the set of vertices which are connected to a Vertex through some path as value of that vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + GSAConfiguration parameters = new GSAConfiguration(); + parameters.setDirection(EdgeDirection.ALL); + + List<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdges(); + + edges.remove(0); + + Graph<Long, HashSet<Long>, Long> graph = Graph + .fromCollection(TestGraphUtils.getLongLongVertices(), edges, env) + .mapVertices(new GatherSumApplyConfigurationITCase.InitialiseHashSetMapper()); + + DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = graph.runGatherSumApplyIteration( + new GetReachableVertices(), new FindAllReachableVertices(), new UpdateReachableVertices(), 4, + parameters) + .getVertices(); + + List<Vertex<Long, HashSet<Long>>> result = resultedVertices.collect(); + + expectedResult = "1,[1, 2, 3, 4, 5]\n" + +"2,[1, 2, 3, 4, 5]\n" + +"3,[1, 2, 3, 4, 5]\n" + +"4,[1, 2, 3, 4, 5]\n" + +"5,[1, 2, 3, 4, 5]\n"; + + compareResultAsTuples(result, expectedResult); + } + @SuppressWarnings("serial") private static final class Gather extends GatherFunction<Long, Long, Long> { @@ -243,4 +357,47 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase return 1l; } } + + @SuppressWarnings("serial") + public static final class InitialiseHashSetMapper implements MapFunction<Vertex<Long, Long>, HashSet<Long>> { + + @Override + public HashSet<Long> map(Vertex<Long, Long> value) throws Exception { + HashSet<Long> h = new HashSet<Long>(); + h.add(value.getId()); + return h; + } + } + + private static final class GetReachableVertices extends GatherFunction<HashSet<Long>, Long, HashSet<Long>> { + + @Override + public HashSet<Long> gather(Neighbor<HashSet<Long>, Long> neighbor) { + return neighbor.getNeighborValue(); + } + } + + private static final class FindAllReachableVertices extends SumFunction<HashSet<Long>, Long, HashSet<Long>> { + @Override + public HashSet<Long> sum(HashSet<Long> newSet, HashSet<Long> currentSet) { + HashSet<Long> set = currentSet; + for(Long l : newSet) { + set.add(l); + } + return set; + } + } + + private static final class UpdateReachableVertices extends ApplyFunction<Long, HashSet<Long>, HashSet<Long>> { + + @Override + public void apply(HashSet<Long> newValue, HashSet<Long> currentValue) { + newValue.addAll(currentValue); + if(newValue.size()>currentValue.size()) { + setResult(newValue); + } + } + } + + }
