[FLINK-1201] [gelly] set joins to sort-merge joins in isWeaklyConnected
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b0e17e39 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b0e17e39 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b0e17e39 Branch: refs/heads/master Commit: b0e17e39d8318bcd5d905098b2dce1cfd5c165f9 Parents: d57a9d7 Author: vasia <vasilikikala...@gmail.com> Authored: Sun Jan 4 19:07:38 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Feb 11 10:46:13 2015 +0100 ---------------------------------------------------------------------- .../src/main/java/org/apache/flink/graph/Graph.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b0e17e39/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 948256e..ecffbcc 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -31,6 +31,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -761,11 +762,13 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab .iterateDelta(verticesWithInitialIds, maxIterations, 0); DataSet<Tuple2<K, K>> changes = iteration.getWorkset() - .join(edgeIds).where(0).equalTo(0) + .join(edgeIds, JoinHint.REPARTITION_SORT_MERGE) + .where(0).equalTo(0) .with(new FindNeighborsJoin<K>()) .groupBy(0) .aggregate(Aggregations.MIN, 1) - .join(iteration.getSolutionSet()).where(0).equalTo(0) + .join(iteration.getSolutionSet(), JoinHint.REPARTITION_SORT_MERGE) + .where(0).equalTo(0) .with(new VertexWithNewComponentJoin<K>()); DataSet<Tuple2<K, K>> components = iteration.closeWith(changes, changes);