[FLINK-1201] [gelly] set joins to sort-merge joins in isWeaklyConnected

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b0e17e39
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b0e17e39
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b0e17e39

Branch: refs/heads/master
Commit: b0e17e39d8318bcd5d905098b2dce1cfd5c165f9
Parents: d57a9d7
Author: vasia <vasilikikala...@gmail.com>
Authored: Sun Jan 4 19:07:38 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 11 10:46:13 2015 +0100

----------------------------------------------------------------------
 .../src/main/java/org/apache/flink/graph/Graph.java           | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b0e17e39/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 948256e..ecffbcc 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -761,11 +762,13 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
                 .iterateDelta(verticesWithInitialIds, maxIterations, 0);
 
         DataSet<Tuple2<K, K>> changes = iteration.getWorkset()
-                .join(edgeIds).where(0).equalTo(0)
+                .join(edgeIds, JoinHint.REPARTITION_SORT_MERGE)
+                .where(0).equalTo(0)
                 .with(new FindNeighborsJoin<K>())
                 .groupBy(0)
                 .aggregate(Aggregations.MIN, 1)
-                .join(iteration.getSolutionSet()).where(0).equalTo(0)
+                .join(iteration.getSolutionSet(), 
JoinHint.REPARTITION_SORT_MERGE)
+                .where(0).equalTo(0)
                 .with(new VertexWithNewComponentJoin<K>());
 
         DataSet<Tuple2<K, K>> components = iteration.closeWith(changes, 
changes);

Reply via email to