Repository: flink Updated Branches: refs/heads/master 8f28f41dc -> beb7f3122
[FLINK-1975][gelly] Graph getUndirected improvement This closes #653 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/beb7f312 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/beb7f312 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/beb7f312 Branch: refs/heads/master Commit: beb7f31227766cfa17691525ccc666d105d442da Parents: 8f28f41 Author: andralungu <lungu.an...@gmail.com> Authored: Fri May 8 19:01:53 2015 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Fri May 15 15:36:38 2015 +0200 ---------------------------------------------------------------------- .../src/main/java/org/apache/flink/graph/Graph.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/beb7f312/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 33d8e1c..0ddd2d4 100755 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -714,7 +714,7 @@ public class Graph<K, VV, EV> { */ public Graph<K, VV, EV> getUndirected() { - DataSet<Edge<K, EV>> undirectedEdges = edges.union(edges.map(new ReverseEdgesMap<K, EV>())); + DataSet<Edge<K, EV>> undirectedEdges = edges.flatMap(new RegularAndReversedEdgesMap<K, EV>()); return new Graph<K, VV, EV>(vertices, undirectedEdges, this.context); } @@ -936,6 +936,16 @@ public class Graph<K, VV, EV> { } } + private static final class RegularAndReversedEdgesMap<K, EV> + implements FlatMapFunction<Edge<K, EV>, Edge<K, EV>> { + + @Override + public void flatMap(Edge<K, EV> edge, Collector<Edge<K, EV>> out) throws Exception { + out.collect(new Edge<K, EV>(edge.f0, edge.f1, edge.f2)); + out.collect(new Edge<K, EV>(edge.f1, edge.f0, edge.f2)); + } + } + /** * Reverse the direction of the edges in the graph *