[FLINK-1201] [gelly] created fromTupleDataSet() methods issue #53
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/878118a1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/878118a1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/878118a1 Branch: refs/heads/master Commit: 878118a1996f4b62b2830b3d309206c8b383a393 Parents: 0498abc Author: Carsten Brandt <m...@cebe.cc> Authored: Sat Jan 17 01:18:26 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Feb 11 10:46:15 2015 +0100 ---------------------------------------------------------------------- .../main/java/org/apache/flink/graph/Graph.java | 78 ++++++++++++++++++++ 1 file changed, 78 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/878118a1/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 307f177..6561803 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -237,6 +237,68 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } /** + * Creates a graph from a DataSet of Tuple objects for vertices and edges. + * + * Vertices with value are created from Tuple2, + * Edges with value are created from Tuple3. + * + * @param vertices a DataSet of vertices. + * @param edges a DataSet of vertices. + * @param context the flink execution environment. + * @return the newly created graph. + */ + @SuppressWarnings({ "unchecked" }) + public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> + Graph<K, VV, EV> fromTupleDataSet (DataSet<Tuple2<K, VV>> vertices, + DataSet<Tuple3<K, K, EV>> edges, + ExecutionEnvironment context) { + + DataSet<Vertex<K, VV>> vertexDataSet = (DataSet<Vertex<K, VV>>) (DataSet<?>) vertices; + DataSet<Edge<K, EV>> edgeDataSet = (DataSet<Edge<K, EV>>) (DataSet<?>) edges; + return fromDataSet(vertexDataSet, edgeDataSet, context); + } + + /** + * Creates a graph from a DataSet of Tuple objects for edges, vertices are induced from the edges. + * + * Edges with value are created from Tuple3. + * Vertices are created automatically and their values are set to NullValue. + * + * @param edges a DataSet of vertices. + * @param context the flink execution environment. + * @return the newly created graph. + */ + @SuppressWarnings({ "unchecked" }) + public static <K extends Comparable<K> & Serializable, EV extends Serializable> + Graph<K, NullValue, EV> fromTupleDataSet (DataSet<Tuple3<K, K, EV>> edges, + ExecutionEnvironment context) { + + DataSet<Edge<K, EV>> edgeDataSet = (DataSet<Edge<K, EV>>) (DataSet<?>) edges; + return fromDataSet(edgeDataSet, context); + } + + /** + * Creates a graph from a DataSet of Tuple objects for edges, vertices are induced from the edges and + * vertex values are calculated by a mapper function. + * Edges with value are created from Tuple3. + * Vertices are created automatically and their values are set + * by applying the provided map function to the vertex ids. + * @param edges a DataSet of vertices. + * @param mapper the mapper function. + * @param context the flink execution environment. + * @return the newly created graph. + */ + @SuppressWarnings({ "unchecked" }) + public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> + Graph<K, VV, EV> fromTupleDataSet (DataSet<Tuple3<K, K, EV>> edges, + final MapFunction<K, VV> mapper, + ExecutionEnvironment context) { + + DataSet<Edge<K, EV>> edgeDataSet = (DataSet<Edge<K, EV>>) (DataSet<?>) edges; + return fromDataSet(edgeDataSet, mapper, context); + } + + /** * @return the flink execution environment. */ public ExecutionEnvironment getContext() { @@ -265,6 +327,22 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab return edges; } + /** + * @return the vertex DataSet as Tuple2. + */ + @SuppressWarnings({ "unchecked" }) + public DataSet<Tuple2<K, VV>> getVerticesAsTuple2() { + return (DataSet<Tuple2<K, VV>>) (DataSet<?>) vertices; + } + + /** + * @return the edge DataSet as Tuple3. + */ + @SuppressWarnings({ "unchecked" }) + public DataSet<Tuple3<K, K, EV>> getEdgesAsTuple3() { + return (DataSet<Tuple3<K, K, EV>>) (DataSet<?>) edges; + } + /** * Apply a function to the attribute of each vertex in the graph. * @param mapper the map function to apply.