[FLINK-1201] [gelly] created fromTupleDataSet() methods

issue #53


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/878118a1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/878118a1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/878118a1

Branch: refs/heads/master
Commit: 878118a1996f4b62b2830b3d309206c8b383a393
Parents: 0498abc
Author: Carsten Brandt <m...@cebe.cc>
Authored: Sat Jan 17 01:18:26 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 11 10:46:15 2015 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/graph/Graph.java | 78 ++++++++++++++++++++
 1 file changed, 78 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/878118a1/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 307f177..6561803 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -237,6 +237,68 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
        }
 
        /**
+        * Creates a graph from a DataSet of Tuple objects for vertices and 
edges.
+        *
+        * Vertices with value are created from Tuple2,
+        * Edges with value are created from Tuple3.
+        *
+        * @param vertices a DataSet of vertices.
+        * @param edges a DataSet of vertices.
+        * @param context the flink execution environment.
+        * @return the newly created graph.
+        */
+       @SuppressWarnings({ "unchecked" })
+       public static <K extends Comparable<K> & Serializable, VV extends 
Serializable, EV extends Serializable>
+               Graph<K, VV, EV> fromTupleDataSet (DataSet<Tuple2<K, VV>> 
vertices,
+                                                                               
   DataSet<Tuple3<K, K, EV>> edges,
+                                                                               
   ExecutionEnvironment context) {
+
+               DataSet<Vertex<K, VV>> vertexDataSet = (DataSet<Vertex<K, VV>>) 
(DataSet<?>) vertices;
+               DataSet<Edge<K, EV>> edgeDataSet = (DataSet<Edge<K, EV>>) 
(DataSet<?>) edges;
+               return fromDataSet(vertexDataSet, edgeDataSet, context);
+       }
+
+       /**
+        * Creates a graph from a DataSet of Tuple objects for edges, vertices 
are induced from the edges.
+        *
+        * Edges with value are created from Tuple3.
+        * Vertices are created automatically and their values are set to 
NullValue.
+        *
+        * @param edges a DataSet of vertices.
+        * @param context the flink execution environment.
+        * @return the newly created graph.
+        */
+       @SuppressWarnings({ "unchecked" })
+       public static <K extends Comparable<K> & Serializable, EV extends 
Serializable>
+               Graph<K, NullValue, EV> fromTupleDataSet (DataSet<Tuple3<K, K, 
EV>> edges,
+                                                                               
                  ExecutionEnvironment context) {
+
+               DataSet<Edge<K, EV>> edgeDataSet = (DataSet<Edge<K, EV>>) 
(DataSet<?>) edges;
+               return fromDataSet(edgeDataSet, context);
+       }
+
+       /**
+        * Creates a graph from a DataSet of Tuple objects for edges, vertices 
are induced from the edges and
+        * vertex values are calculated by a mapper function.
+        * Edges with value are created from Tuple3.
+        * Vertices are created automatically and their values are set
+        * by applying the provided map function to the vertex ids.
+        * @param edges a DataSet of vertices.
+        * @param mapper the mapper function.
+        * @param context the flink execution environment.
+        * @return the newly created graph.
+        */
+       @SuppressWarnings({ "unchecked" })
+       public static <K extends Comparable<K> & Serializable, VV extends 
Serializable, EV extends Serializable>
+       Graph<K, VV, EV> fromTupleDataSet (DataSet<Tuple3<K, K, EV>> edges,
+                                                                          
final MapFunction<K, VV> mapper,
+                                                                          
ExecutionEnvironment context) {
+
+               DataSet<Edge<K, EV>> edgeDataSet = (DataSet<Edge<K, EV>>) 
(DataSet<?>) edges;
+               return fromDataSet(edgeDataSet, mapper, context);
+       }
+
+       /**
         * @return the flink execution environment.
         */
        public ExecutionEnvironment getContext() {
@@ -265,6 +327,22 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
                return edges;
        }
 
+       /**
+        * @return the vertex DataSet as Tuple2.
+        */
+       @SuppressWarnings({ "unchecked" })
+       public DataSet<Tuple2<K, VV>> getVerticesAsTuple2() {
+               return (DataSet<Tuple2<K, VV>>) (DataSet<?>) vertices;
+       }
+
+       /**
+        * @return the edge DataSet as Tuple3.
+        */
+       @SuppressWarnings({ "unchecked" })
+       public DataSet<Tuple3<K, K, EV>> getEdgesAsTuple3() {
+               return (DataSet<Tuple3<K, K, EV>>) (DataSet<?>) edges;
+       }
+
     /**
      * Apply a function to the attribute of each vertex in the graph.
      * @param mapper the map function to apply.

Reply via email to