Repository: flink
Updated Branches:
  refs/heads/master 3b69b2499 -> 9350264bd


[FLINK-2141][gelly] Allow GSA's Gather to perform this operation in IN, OUT or 
ALL directions

[FLINK-2141][gelly] Added Test cases for GSAConfiguration setDirection. Made 
changes in coding style

[FLINK-2141][gelly]Removed Example

[FLINK-2141][gelly] Corrected Annotation and Gelly guide

Minor Changes in Gelly Guide

This closes #877


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9350264b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9350264b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9350264b

Branch: refs/heads/master
Commit: 9350264bdda2b4a5e230c4621a612e38a0f77be1
Parents: 3b69b24
Author: Shivani <[email protected]>
Authored: Wed Jul 1 03:03:58 2015 +0200
Committer: andra <[email protected]>
Committed: Sun Jul 12 11:18:27 2015 +0200

----------------------------------------------------------------------
 docs/libs/gelly_guide.md                        |  20 +++
 .../flink/graph/gsa/GSAConfiguration.java       |  23 +++
 .../graph/gsa/GatherSumApplyIteration.java      |  47 +++++-
 .../test/GatherSumApplyConfigurationITCase.java | 157 +++++++++++++++++++
 4 files changed, 244 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9350264b/docs/libs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md
index 1e9bf48..e0e05b3 100644
--- a/docs/libs/gelly_guide.md
+++ b/docs/libs/gelly_guide.md
@@ -693,6 +693,9 @@ Currently, the following parameters can be specified:
 * <strong>Number of Vertices</strong>: Accessing the total number of vertices 
within the iteration. This property can be set using the `setOptNumVertices()` 
method.
 The number of vertices can then be accessed in the gather, sum and/or apply 
functions by using the `getNumberOfVertices()` method. If the option is not set 
in the configuration, this method will return -1.
 
+* <strong>Neighbor Direction</strong>: By default values are gathered from the 
out neighbors of the Vertex. This can be modified
+using the `setDirection()` method.
+
 The following example illustrates the usage of the number of vertices option.
 
 {% highlight java %}
@@ -734,6 +737,23 @@ public static final class Apply {
 
 {% endhighlight %}
 
+The following example illustrates the usage of the edge direction option.
+{% highlight java %}
+
+Graph<Long, HashSet<Long>, Double> graph = ...
+
+// configure the iteration
+GSAConfiguration parameters = new GSAConfiguration();
+
+// set the messaging direction
+parameters.setDirection(EdgeDirection.IN);
+
+// run the gather-sum-apply iteration, also passing the configuration 
parameters
+DataSet<Vertex<Long, HashSet<Long>>> result =
+                       graph.runGatherSumApplyIteration(
+                       new Gather(), new Sum(), new Apply(), maxIterations, 
parameters)
+                       .getVertices();
+{% endhighlight %}
 [Back to top](#top)
 
 ### Vertex-centric and GSA Comparison

http://git-wip-us.apache.org/repos/asf/flink/blob/9350264b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java
index de47280..8d24f16 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.flink.graph.gsa;
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.EdgeDirection;
 import org.apache.flink.graph.IterationConfiguration;
 
 import java.util.ArrayList;
@@ -46,6 +47,8 @@ public class GSAConfiguration extends IterationConfiguration {
        /** the broadcast variables for the apply function **/
        private List<Tuple2<String, DataSet<?>>> bcVarsApply = new 
ArrayList<Tuple2<String,DataSet<?>>>();
 
+       private EdgeDirection direction = EdgeDirection.OUT;
+
        public GSAConfiguration() {}
 
        /**
@@ -107,4 +110,24 @@ public class GSAConfiguration extends 
IterationConfiguration {
        public List<Tuple2<String, DataSet<?>>> getApplyBcastVars() {
                return this.bcVarsApply;
        }
+
+       /**
+        * Gets the direction from which the neighbors are to be selected
+        * By default the neighbors who are target of the edges are selected
+        *
+        * @return an EdgeDirection, which can be either IN, OUT or ALL.
+        */
+       public EdgeDirection getDirection() {
+               return direction;
+       }
+
+       /**
+        * Sets the direction in which neighbors are to be selected
+        * By default the neighbors who are target of the edges are selected
+        *
+        * @param direction - IN, OUT or ALL
+        */
+       public void setDirection(EdgeDirection direction) {
+               this.direction = direction;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9350264b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
index 389cf02..4c91089 100755
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
@@ -39,6 +39,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.util.Collector;
@@ -64,6 +65,7 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements 
CustomUnaryOperati
        private final SumFunction<VV, EV, M> sum;
        private final ApplyFunction<K, VV, M> apply;
        private final int maximumNumberOfIterations;
+       private EdgeDirection direction = EdgeDirection.OUT;
 
        private GSAConfiguration configuration;
 
@@ -163,9 +165,34 @@ public class GatherSumApplyIteration<K, VV, EV, M> 
implements CustomUnaryOperati
                }
 
                // Prepare the neighbors
-               DataSet<Tuple2<K, Neighbor<VV, EV>>> neighbors = iteration
+               if(this.configuration != null) {
+                       direction = this.configuration.getDirection();
+               }
+               DataSet<Tuple2<K, Neighbor<VV, EV>>> neighbors;
+               switch(direction) {
+                       case OUT:
+                               neighbors = iteration
+                               .getWorkset().join(edgeDataSet)
+                               .where(0).equalTo(0).with(new 
ProjectKeyWithNeighborOUT<K, VV, EV>());
+                               break;
+                       case IN:
+                               neighbors = iteration
                                .getWorkset().join(edgeDataSet)
-                               .where(0).equalTo(0).with(new 
ProjectKeyWithNeighbor<K, VV, EV>());
+                               .where(0).equalTo(1).with(new 
ProjectKeyWithNeighborIN<K, VV, EV>());
+                               break;
+                       case ALL:
+                               neighbors =  iteration
+                                               .getWorkset().join(edgeDataSet)
+                                               .where(0).equalTo(0).with(new 
ProjectKeyWithNeighborOUT<K, VV, EV>()).union(iteration
+                                                               
.getWorkset().join(edgeDataSet)
+                                                               
.where(0).equalTo(1).with(new ProjectKeyWithNeighborIN<K, VV, EV>()));
+                               break;
+                       default:
+                               neighbors = iteration
+                                               .getWorkset().join(edgeDataSet)
+                                               .where(0).equalTo(0).with(new 
ProjectKeyWithNeighborOUT<K, VV, EV>());
+                               break;
+               }
 
                // Gather, sum and apply
                MapOperator<Tuple2<K, Neighbor<VV, EV>>, Tuple2<K, M>> 
gatherMapOperator = neighbors.map(gatherUdf);
@@ -358,7 +385,7 @@ public class GatherSumApplyIteration<K, VV, EV, M> 
implements CustomUnaryOperati
 
        @SuppressWarnings("serial")
        @ForwardedFieldsSecond("f1->f0")
-       private static final class ProjectKeyWithNeighbor<K, VV, EV> implements 
FlatJoinFunction<
+       private static final class ProjectKeyWithNeighborOUT<K, VV, EV> 
implements FlatJoinFunction<
                        Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Neighbor<VV, 
EV>>> {
 
                public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, 
Collector<Tuple2<K, Neighbor<VV, EV>>> out) {
@@ -367,6 +394,20 @@ public class GatherSumApplyIteration<K, VV, EV, M> 
implements CustomUnaryOperati
                }
        }
 
+       @SuppressWarnings("serial")
+       @ForwardedFieldsSecond({"f0"})
+       private static final class ProjectKeyWithNeighborIN<K, VV, EV> 
implements FlatJoinFunction<
+                       Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Neighbor<VV, 
EV>>> {
+
+               public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, 
Collector<Tuple2<K, Neighbor<VV, EV>>> out) {
+                       out.collect(new Tuple2<K, Neighbor<VV, EV>>(
+                                       edge.getSource(), new Neighbor<VV, 
EV>(vertex.getValue(), edge.getValue())));
+               }
+       }
+
+
+
+
        /**
         * Configures this gather-sum-apply iteration with the provided 
parameters.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/9350264b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
index 701eda9..53455c5 100644
--- 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
+++ 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.gsa.ApplyFunction;
@@ -38,6 +40,7 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.util.HashSet;
 import java.util.List;
 
 @RunWith(Parameterized.class)
@@ -118,6 +121,117 @@ public class GatherSumApplyConfigurationITCase extends 
MultipleProgramsTestBase
                compareResultAsTuples(result, expectedResult);
        }
 
+       @Test
+       public void testIterationDefaultDirection() throws Exception {
+
+               /*
+                * Test that if no direction parameter is given, the iteration 
works as before
+                * (i.e. it gathers information from the IN edges and neighbors 
and the information is calculated for an OUT edge
+                * Default direction parameter is OUT for the 
GatherSumApplyIterations)
+                * When data is gathered from the IN edges the Gather Sum and 
Apply functions
+                * set the set of vertices which have path to a vertex as the 
value of that vertex
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               List<Edge<Long, Long>> edges = 
TestGraphUtils.getLongLongEdges();
+
+               edges.remove(0);
+
+               Graph<Long, HashSet<Long>, Long> graph = Graph
+                               
.fromCollection(TestGraphUtils.getLongLongVertices(), edges, env)
+                               .mapVertices(new 
GatherSumApplyConfigurationITCase.InitialiseHashSetMapper());
+
+               DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = 
graph.runGatherSumApplyIteration(
+                               new GetReachableVertices(), new 
FindAllReachableVertices(), new UpdateReachableVertices(), 4)
+                               .getVertices();
+
+               List<Vertex<Long, HashSet<Long>>> result = 
resultedVertices.collect();
+
+               expectedResult = "1,[1, 2, 3, 4, 5]\n"
+                                               +"2,[2]\n"
+                                               +"3,[1, 2, 3, 4, 5]\n"
+                                               +"4,[1, 2, 3, 4, 5]\n"
+                                               +"5,[1, 2, 3, 4, 5]\n";
+
+               compareResultAsTuples(result, expectedResult);
+       }
+
+       @Test
+       public void testIterationDirectionIN() throws Exception {
+
+               /*
+                * Test that if the direction parameter IN is given, the 
iteration works as expected
+                * (i.e. it gathers information from the OUT edges and 
neighbors and the information is calculated for an IN edge
+                * When data is gathered from the OUT edges the Gather Sum and 
Apply functions
+                * set the set of vertices which have path from a vertex as the 
value of that vertex
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               GSAConfiguration parameters = new GSAConfiguration();
+
+               parameters.setDirection(EdgeDirection.IN);
+
+               List<Edge<Long, Long>> edges = 
TestGraphUtils.getLongLongEdges();
+
+               edges.remove(0);
+
+               Graph<Long, HashSet<Long>, Long> graph = Graph
+                               
.fromCollection(TestGraphUtils.getLongLongVertices(), edges, env)
+                               .mapVertices(new 
GatherSumApplyConfigurationITCase.InitialiseHashSetMapper());
+
+               DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = 
graph.runGatherSumApplyIteration(
+                               new GetReachableVertices(), new 
FindAllReachableVertices(), new UpdateReachableVertices(), 4,
+                                                                               
                                                                                
                                parameters)
+                               .getVertices();
+               List<Vertex<Long, HashSet<Long>>> result = 
resultedVertices.collect();
+
+               expectedResult = "1,[1, 3, 4, 5]\n"
+                               +"2,[1, 2, 3, 4, 5]\n"
+                               +"3,[1, 3, 4, 5]\n"
+                               +"4,[1, 3, 4, 5]\n"
+                               +"5,[1, 3, 4, 5]\n";
+
+               compareResultAsTuples(result, expectedResult);
+       }
+
+       @Test
+       public void testIterationDirectionALL() throws Exception {
+
+               /*
+                * Test that if the direction parameter OUT is given, the 
iteration works as expected
+                * (i.e. it gathers information from both IN and OUT edges and 
neighbors
+                * When data is gathered from the ALL edges the Gather Sum and 
Apply functions
+                * set the set of vertices which are connected to a Vertex 
through some path as value of that vertex
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               GSAConfiguration parameters = new GSAConfiguration();
+               parameters.setDirection(EdgeDirection.ALL);
+
+               List<Edge<Long, Long>> edges = 
TestGraphUtils.getLongLongEdges();
+
+               edges.remove(0);
+
+               Graph<Long, HashSet<Long>, Long> graph = Graph
+                               
.fromCollection(TestGraphUtils.getLongLongVertices(), edges, env)
+                               .mapVertices(new 
GatherSumApplyConfigurationITCase.InitialiseHashSetMapper());
+
+               DataSet<Vertex<Long, HashSet<Long>>> resultedVertices = 
graph.runGatherSumApplyIteration(
+                               new GetReachableVertices(), new 
FindAllReachableVertices(), new UpdateReachableVertices(), 4,
+                               parameters)
+                               .getVertices();
+
+               List<Vertex<Long, HashSet<Long>>> result = 
resultedVertices.collect();
+
+               expectedResult = "1,[1, 2, 3, 4, 5]\n"
+                               +"2,[1, 2, 3, 4, 5]\n"
+                               +"3,[1, 2, 3, 4, 5]\n"
+                               +"4,[1, 2, 3, 4, 5]\n"
+                               +"5,[1, 2, 3, 4, 5]\n";
+
+               compareResultAsTuples(result, expectedResult);
+       }
+
        @SuppressWarnings("serial")
        private static final class Gather extends GatherFunction<Long, Long, 
Long> {
 
@@ -243,4 +357,47 @@ public class GatherSumApplyConfigurationITCase extends 
MultipleProgramsTestBase
                        return 1l;
                }
        }
+
+       @SuppressWarnings("serial")
+       public static final class InitialiseHashSetMapper implements 
MapFunction<Vertex<Long, Long>, HashSet<Long>> {
+
+               @Override
+               public HashSet<Long> map(Vertex<Long, Long> value) throws 
Exception {
+                       HashSet<Long> h = new HashSet<Long>();
+                       h.add(value.getId());
+                       return h;
+               }
+       }
+
+       private static final class GetReachableVertices extends 
GatherFunction<HashSet<Long>, Long, HashSet<Long>> {
+
+               @Override
+               public HashSet<Long> gather(Neighbor<HashSet<Long>, Long> 
neighbor) {
+                       return neighbor.getNeighborValue();
+               }
+       }
+
+       private static final class FindAllReachableVertices extends 
SumFunction<HashSet<Long>, Long, HashSet<Long>> {
+               @Override
+               public HashSet<Long> sum(HashSet<Long> newSet, HashSet<Long> 
currentSet) {
+                       HashSet<Long> set = currentSet;
+                       for(Long l : newSet) {
+                               set.add(l);
+                       }
+                       return set;
+               }
+       }
+
+       private static final class UpdateReachableVertices extends 
ApplyFunction<Long, HashSet<Long>, HashSet<Long>> {
+
+               @Override
+               public void apply(HashSet<Long> newValue, HashSet<Long> 
currentValue) {
+                       newValue.addAll(currentValue);
+                       if(newValue.size()>currentValue.size()) {
+                               setResult(newValue);
+                       }
+               }
+       }
+
+
 }

Reply via email to