[FLINK-1201] [gelly] remove isUndirected flag; methods will always consider the graph as directed
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2b98bebc Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2b98bebc Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2b98bebc Branch: refs/heads/master Commit: 2b98bebc28e13b9bd933884961ced757b653936f Parents: 878118a Author: vasia <vasilikikala...@gmail.com> Authored: Mon Jan 19 16:18:08 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Feb 11 10:46:15 2015 +0100 ---------------------------------------------------------------------- .../main/java/org/apache/flink/graph/Graph.java | 469 ++++++++----------- 1 file changed, 196 insertions(+), 273 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2b98bebc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 6561803..3caf13d 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -72,230 +72,18 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab private final ExecutionEnvironment context; private final DataSet<Vertex<K, VV>> vertices; private final DataSet<Edge<K, EV>> edges; - private boolean isUndirected; /** - * Creates a graph from two DataSets: vertices and edges + * Creates a graph from two datasets: vertices and edges and allow setting the undirected property * * @param vertices a DataSet of vertices. * @param edges a DataSet of vertices. * @param context the flink execution environment. */ - private Graph(DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) { - - /** a graph is directed by default */ - this(vertices, edges, context, false); - } - - /** - * Creates a graph from two DataSets: vertices and edges and allow setting the undirected property - * - * @param vertices a DataSet of vertices. - * @param edges a DataSet of vertices. - * @param context the flink execution environment. - * @param undirected whether this is an undirected graph - */ - private Graph(DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>> edges, ExecutionEnvironment context, - boolean undirected) { + public Graph(DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) { this.vertices = vertices; this.edges = edges; this.context = context; - this.isUndirected = undirected; - } - - /** - * Creates a graph from a Collection of vertices and a Collection of edges. - * @param vertices a Collection of vertices. - * @param edges a Collection of vertices. - * @param context the flink execution environment. - * @return the newly created graph. - */ - public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> - Graph<K, VV, EV> fromCollection (Collection<Vertex<K,VV>> vertices, - Collection<Edge<K,EV>> edges, - ExecutionEnvironment context) { - - return fromDataSet(context.fromCollection(vertices), context.fromCollection(edges), context); - } - - /** - * Creates a graph from a Collection of edges, vertices are induced from the edges. - * Vertices are created automatically and their values are set to NullValue. - * @param edges a Collection of vertices. - * @param context the flink execution environment. - * @return the newly created graph. - */ - public static <K extends Comparable<K> & Serializable, EV extends Serializable> - Graph<K, NullValue, EV> fromCollection (Collection<Edge<K,EV>> edges, ExecutionEnvironment context) { - - return fromDataSet(context.fromCollection(edges), context); - } - - /** - * Creates a graph from a Collection of edges, vertices are induced from the edges and - * vertex values are calculated by a mapper function. - * Vertices are created automatically and their values are set - * by applying the provided map function to the vertex ids. - * @param edges a Collection of vertices. - * @param mapper the mapper function. - * @param context the flink execution environment. - * @return the newly created graph. - */ - public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> - Graph<K, VV, EV> fromCollection (Collection<Edge<K,EV>> edges, - final MapFunction<K, VV> mapper, - ExecutionEnvironment context) { - - return fromDataSet(context.fromCollection(edges), mapper, context); - } - - /** - * Creates a graph from a DataSet of vertices and a DataSet of edges. - * @param vertices a DataSet of vertices. - * @param edges a DataSet of vertices. - * @param context the flink execution environment. - * @return the newly created graph. - */ - public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> - Graph<K, VV, EV> fromDataSet (DataSet<Vertex<K,VV>> vertices, - DataSet<Edge<K,EV>> edges, - ExecutionEnvironment context) { - - return new Graph<K, VV, EV>(vertices, edges, context); - } - - /** - * Creates a graph from a DataSet of edges, vertices are induced from the edges. - * Vertices are created automatically and their values are set to NullValue. - * @param edges a DataSet of vertices. - * @param context the flink execution environment. - * @return the newly created graph. - */ - public static <K extends Comparable<K> & Serializable, EV extends Serializable> - Graph<K, NullValue, EV> fromDataSet (DataSet<Edge<K,EV>> edges, - ExecutionEnvironment context) { - - DataSet<Vertex<K, NullValue>> vertices = - edges.flatMap(new EmitSrcAndTarget<K, EV>()).distinct(); - - return new Graph<K, NullValue, EV>(vertices, edges, context); - } - - private static final class EmitSrcAndTarget<K extends Comparable<K> & Serializable, EV extends Serializable> - implements FlatMapFunction<Edge<K, EV>, Vertex<K, NullValue>> { - - public void flatMap(Edge<K, EV> edge, Collector<Vertex<K, NullValue>> out) { - out.collect(new Vertex<K, NullValue>(edge.f0, NullValue.getInstance())); - out.collect(new Vertex<K, NullValue>(edge.f1, NullValue.getInstance())); - } - } - - /** - * Creates a graph from a DataSet of edges, vertices are induced from the edges and - * vertex values are calculated by a mapper function. - * Vertices are created automatically and their values are set - * by applying the provided map function to the vertex ids. - * @param edges a DataSet of vertices. - * @param mapper the mapper function. - * @param context the flink execution environment. - * @return the newly created graph. - */ - public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> - Graph<K, VV, EV> fromDataSet (DataSet<Edge<K,EV>> edges, - final MapFunction<K, VV> mapper, - ExecutionEnvironment context) { - - TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0); - - TypeInformation<VV> valueType = TypeExtractor - .createTypeInfo(MapFunction.class, mapper.getClass(), 1, null, null); - - @SuppressWarnings({ "unchecked", "rawtypes" }) - TypeInformation<Vertex<K, VV>> returnType = (TypeInformation<Vertex<K, VV>>) - new TupleTypeInfo(Vertex.class, keyType, valueType); - - DataSet<Vertex<K, VV>> vertices = - edges.flatMap(new EmitSrcAndTargetAsTuple1<K, EV>()) - .distinct() - .map(new MapFunction<Tuple1<K>, Vertex<K, VV>>() { - public Vertex<K, VV> map(Tuple1<K> value) throws Exception { - return new Vertex<K, VV>(value.f0, mapper.map(value.f0)); - } - }) - .returns(returnType); - - return new Graph<K, VV, EV>(vertices, edges, context); - } - - private static final class EmitSrcAndTargetAsTuple1<K extends Comparable<K> & Serializable, - EV extends Serializable> implements FlatMapFunction<Edge<K, EV>, Tuple1<K>> { - - public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) { - out.collect(new Tuple1<K>(edge.f0)); - out.collect(new Tuple1<K>(edge.f1)); - } - } - - /** - * Creates a graph from a DataSet of Tuple objects for vertices and edges. - * - * Vertices with value are created from Tuple2, - * Edges with value are created from Tuple3. - * - * @param vertices a DataSet of vertices. - * @param edges a DataSet of vertices. - * @param context the flink execution environment. - * @return the newly created graph. - */ - @SuppressWarnings({ "unchecked" }) - public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> - Graph<K, VV, EV> fromTupleDataSet (DataSet<Tuple2<K, VV>> vertices, - DataSet<Tuple3<K, K, EV>> edges, - ExecutionEnvironment context) { - - DataSet<Vertex<K, VV>> vertexDataSet = (DataSet<Vertex<K, VV>>) (DataSet<?>) vertices; - DataSet<Edge<K, EV>> edgeDataSet = (DataSet<Edge<K, EV>>) (DataSet<?>) edges; - return fromDataSet(vertexDataSet, edgeDataSet, context); - } - - /** - * Creates a graph from a DataSet of Tuple objects for edges, vertices are induced from the edges. - * - * Edges with value are created from Tuple3. - * Vertices are created automatically and their values are set to NullValue. - * - * @param edges a DataSet of vertices. - * @param context the flink execution environment. - * @return the newly created graph. - */ - @SuppressWarnings({ "unchecked" }) - public static <K extends Comparable<K> & Serializable, EV extends Serializable> - Graph<K, NullValue, EV> fromTupleDataSet (DataSet<Tuple3<K, K, EV>> edges, - ExecutionEnvironment context) { - - DataSet<Edge<K, EV>> edgeDataSet = (DataSet<Edge<K, EV>>) (DataSet<?>) edges; - return fromDataSet(edgeDataSet, context); - } - - /** - * Creates a graph from a DataSet of Tuple objects for edges, vertices are induced from the edges and - * vertex values are calculated by a mapper function. - * Edges with value are created from Tuple3. - * Vertices are created automatically and their values are set - * by applying the provided map function to the vertex ids. - * @param edges a DataSet of vertices. - * @param mapper the mapper function. - * @param context the flink execution environment. - * @return the newly created graph. - */ - @SuppressWarnings({ "unchecked" }) - public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> - Graph<K, VV, EV> fromTupleDataSet (DataSet<Tuple3<K, K, EV>> edges, - final MapFunction<K, VV> mapper, - ExecutionEnvironment context) { - - DataSet<Edge<K, EV>> edgeDataSet = (DataSet<Edge<K, EV>>) (DataSet<?>) edges; - return fromDataSet(edgeDataSet, mapper, context); } /** @@ -314,35 +102,19 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } /** - * @return the vertex DataSet. + * @return the vertex dataset. */ public DataSet<Vertex<K, VV>> getVertices() { return vertices; } /** - * @return the edge DataSet. + * @return the edge dataset. */ public DataSet<Edge<K, EV>> getEdges() { return edges; } - /** - * @return the vertex DataSet as Tuple2. - */ - @SuppressWarnings({ "unchecked" }) - public DataSet<Tuple2<K, VV>> getVerticesAsTuple2() { - return (DataSet<Tuple2<K, VV>>) (DataSet<?>) vertices; - } - - /** - * @return the edge DataSet as Tuple3. - */ - @SuppressWarnings({ "unchecked" }) - public DataSet<Tuple3<K, K, EV>> getEdgesAsTuple3() { - return (DataSet<Tuple3<K, K, EV>>) (DataSet<?>) edges; - } - /** * Apply a function to the attribute of each vertex in the graph. * @param mapper the map function to apply. @@ -367,7 +139,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab }) .returns(returnType); - return new Graph<K, NV, EV>(mappedVertices, this.edges, this.context); + return new Graph<K, NV, EV>(mappedVertices, this.getEdges(), this.context); } /** @@ -408,7 +180,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab DataSet<Vertex<K, VV>> resultedVertices = this.getVertices() .coGroup(inputDataSet).where(0).equalTo(0) .with(new ApplyCoGroupToVertexValues<K, VV, T>(mapper)); - return new Graph(resultedVertices, this.edges, this.context); + return Graph.create(resultedVertices, this.getEdges(), this.getContext()); } private static final class ApplyCoGroupToVertexValues<K extends Comparable<K> & Serializable, @@ -454,7 +226,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab DataSet<Edge<K, EV>> resultedEdges = this.getEdges() .coGroup(inputDataSet).where(0,1).equalTo(0,1) .with(new ApplyCoGroupToEdgeValues<K, EV, T>(mapper)); - return new Graph(this.vertices, resultedEdges, this.context); + return Graph.create(this.getVertices(), resultedEdges, this.getContext()); } private static final class ApplyCoGroupToEdgeValues<K extends Comparable<K> & Serializable, @@ -503,7 +275,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab .coGroup(inputDataSet).where(0).equalTo(0) .with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(mapper)); - return new Graph(this.vertices, resultedEdges, this.context); + return Graph.create(this.getVertices(), resultedEdges, this.getContext()); } private static final class ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K extends Comparable<K> & Serializable, @@ -555,7 +327,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab .coGroup(inputDataSet).where(1).equalTo(0) .with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(mapper)); - return new Graph(this.vertices, resultedEdges, this.context); + return Graph.create(this.getVertices(), resultedEdges, this.getContext()); } /** @@ -667,19 +439,14 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } /** - * Convert the directed graph into an undirected graph - * by adding all inverse-direction edges. + * This operation adds all inverse-direction edges + * to the graph. * @return the undirected graph. */ public Graph<K, VV, EV> getUndirected() throws UnsupportedOperationException { - if (this.isUndirected) { - throw new UnsupportedOperationException("The graph is already undirected."); - } - else { DataSet<Edge<K, EV>> undirectedEdges = edges.union(edges.map(new ReverseEdgesMap<K, EV>())); - return new Graph<K, VV, EV>(vertices, undirectedEdges, this.context, true); - } + return new Graph<K, VV, EV>(vertices, undirectedEdges, this.context); } /** @@ -878,13 +645,122 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab * @throws UnsupportedOperationException */ public Graph<K, VV, EV> reverse() throws UnsupportedOperationException { - if (this.isUndirected) { - throw new UnsupportedOperationException("The graph is already undirected."); - } - else { - DataSet<Edge<K, EV>> undirectedEdges = edges.map(new ReverseEdgesMap<K, EV>()); - return new Graph<K, VV, EV>(this.vertices, undirectedEdges, this.context, true); - } + DataSet<Edge<K, EV>> reversedEdges = edges.map(new ReverseEdgesMap<K, EV>()); + return new Graph<K, VV, EV>(vertices, reversedEdges, this.context); + } + + /** + * Creates a graph from a dataset of vertices and a dataset of edges + * @param vertices a DataSet of vertices. + * @param edges a DataSet of vertices. + * @param context the flink execution environment. + * @return the newly created graph + */ + public static <K extends Comparable<K> & Serializable, VV extends Serializable, + EV extends Serializable> Graph<K, VV, EV> + create(DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>> edges, + ExecutionEnvironment context) { + return new Graph<K, VV, EV>(vertices, edges, context); + } + + /** + * Creates a graph from a DataSet of edges. + * Vertices are created automatically and their values are set to NullValue. + * @param edges a DataSet of vertices. + * @param context the flink execution environment. + * @return the newly created graph + */ + public static <K extends Comparable<K> & Serializable, EV extends Serializable> + Graph<K, NullValue, EV> create(DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) { + DataSet<Vertex<K, NullValue>> vertices = + edges.flatMap(new EmitSrcAndTarget<K, EV>()).distinct(); + return new Graph<K, NullValue, EV>(vertices, edges, context); + } + + /** + * Creates a graph from a DataSet of edges. + * Vertices are created automatically and their values are set + * by applying the provided map function to the vertex ids. + * @param edges the input edges + * @param mapper the map function to set the initial vertex value + * @return the newly created graph + */ + public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> + Graph<K, VV, EV> create(DataSet<Edge<K, EV>> edges, final MapFunction<K, VV> mapper, + ExecutionEnvironment context) { + TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0); + + TypeInformation<VV> valueType = TypeExtractor + .createTypeInfo(MapFunction.class, mapper.getClass(), 1, null, null); + + @SuppressWarnings({ "unchecked", "rawtypes" }) + TypeInformation<Vertex<K, VV>> returnType = (TypeInformation<Vertex<K, VV>>) + new TupleTypeInfo(Vertex.class, keyType, valueType); + + DataSet<Vertex<K, VV>> vertices = + edges.flatMap(new EmitSrcAndTargetAsTuple1<K, EV>()) + .distinct().map(new MapFunction<Tuple1<K>, Vertex<K, VV>>(){ + public Vertex<K, VV> map(Tuple1<K> value) throws Exception { + return new Vertex<K, VV>(value.f0, mapper.map(value.f0)); + } + }).returns(returnType); + return new Graph<K, VV, EV>(vertices, edges, context); + } + + private static final class EmitSrcAndTarget<K extends Comparable<K> & Serializable, EV extends Serializable> + implements FlatMapFunction<Edge<K, EV>, Vertex<K, NullValue>> { + public void flatMap(Edge<K, EV> edge, + Collector<Vertex<K, NullValue>> out) { + + out.collect(new Vertex<K, NullValue>(edge.f0, NullValue.getInstance())); + out.collect(new Vertex<K, NullValue>(edge.f1, NullValue.getInstance())); + } + } + + private static final class EmitSrcAndTargetAsTuple1<K extends Comparable<K> & Serializable, + EV extends Serializable> implements FlatMapFunction<Edge<K, EV>, Tuple1<K>> { + public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) { + + out.collect(new Tuple1<K>(edge.f0)); + out.collect(new Tuple1<K>(edge.f1)); + } + } + + /** + * Read and create the graph vertex Tuple2 DataSet from a csv file + * + * The CSV file should be of the following format: + * + * <vertexID><delimiter><vertexValue> + * + * For example, with space delimiter: + * + * 1 57 + * 2 45 + * 3 77 + * 4 12 + * + * @param context the flink execution environment. + * @param filePath the path to the CSV file. + * @param delimiter the CSV delimiter. + * @param Tuple2IdClass The class to use for Vertex IDs + * @param Tuple2ValueClass The class to use for Vertex Values + * @return a set of vertices and their values. + */ + public static <K extends Comparable<K> & Serializable, VV extends Serializable> + DataSet<Tuple2<K, VV>> + readTuple2CsvFile(ExecutionEnvironment context, String filePath, + char delimiter, Class<K> Tuple2IdClass, Class<VV> Tuple2ValueClass) { + + CsvReader reader = new CsvReader(filePath, context); + DataSet<Tuple2<K, VV>> vertices = reader.fieldDelimiter(delimiter).types(Tuple2IdClass, Tuple2ValueClass) + .map(new MapFunction<Tuple2<K, VV>, Tuple2<K, VV>>() { + + public Tuple2<K, VV> map(Tuple2<K, VV> value) throws Exception { + return (Tuple2<K, VV>)value; + } + }); + return vertices; } /** @@ -937,17 +813,10 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab * @return true if the graph is weakly connected. */ public DataSet<Boolean> isWeaklyConnected (int maxIterations) { - Graph<K, VV, EV> graph; - - if (!(this.isUndirected)) { - // first, convert to an undirected graph - graph = this.getUndirected(); - } - else { - graph = this; - } + // first, convert to an undirected graph + Graph<K, VV, EV> graph = this.getUndirected(); - DataSet<K> vertexIds = graph.getVertexIds(); + DataSet<K> vertexIds = graph.getVertexIds(); DataSet<Tuple2<K,K>> verticesWithInitialIds = vertexIds .map(new DuplicateVertexIDMapper<K>()); @@ -1011,6 +880,14 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } } + public Graph<K, VV, EV> fromCollection (Collection<Vertex<K,VV>> vertices, Collection<Edge<K,EV>> edges) { + + DataSet<Vertex<K, VV>> v = context.fromCollection(vertices); + DataSet<Edge<K, EV>> e = context.fromCollection(edges); + + return new Graph<K, VV, EV>(v, e, context); + } + /** * Adds the input vertex and edges to the graph. * If the vertex already exists in the graph, it will not be added again, @@ -1025,14 +902,14 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab // Take care of empty edge set if (edges.isEmpty()) { - return new Graph(this.vertices.union(newVertex).distinct(), this.edges, this.context); + return Graph.create(getVertices().union(newVertex).distinct(), getEdges(), context); } // Add the vertex and its edges - DataSet<Vertex<K, VV>> newVertices = this.vertices.union(newVertex).distinct(); - DataSet<Edge<K, EV>> newEdges = this.edges.union(context.fromCollection(edges)); + DataSet<Vertex<K, VV>> newVertices = getVertices().union(newVertex).distinct(); + DataSet<Edge<K, EV>> newEdges = getEdges().union(context.fromCollection(edges)); - return new Graph(newVertices, newEdges, this.context); + return Graph.create(newVertices, newEdges, context); } /** @@ -1045,11 +922,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab * @return the new graph containing the existing vertices and edges plus the newly added edge */ public Graph<K, VV, EV> addEdge (Vertex<K,VV> source, Vertex<K,VV> target, EV edgeValue) { - Graph<K,VV,EV> partialGraph = fromCollection( - Arrays.asList(source, target), - Arrays.asList(new Edge<K, EV>(source.f0, target.f0, edgeValue)), - this.context - ); + Graph<K,VV,EV> partialGraph = this.fromCollection(Arrays.asList(source, target), + Arrays.asList(new Edge<K, EV>(source.f0, target.f0, edgeValue))); return this.union(partialGraph); } @@ -1112,7 +986,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab public Graph<K, VV, EV> removeEdge (Edge<K, EV> edge) { DataSet<Edge<K, EV>> newEdges = getEdges().filter( new EdgeRemovalEdgeFilter<K, EV>(edge)); - return new Graph<K, VV, EV>(this.vertices, newEdges, this.context); + return new Graph<K, VV, EV>(this.getVertices(), newEdges, this.context); } private static final class EdgeRemovalEdgeFilter<K extends Comparable<K> & Serializable, @@ -1154,9 +1028,58 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab DataSet<Vertex<K, VV>> newVertices = vertices.runOperation( VertexCentricIteration.withEdges(edges, vertexUpdateFunction, messagingFunction, maximumNumberOfIterations)); - return new Graph<K, VV, EV>(newVertices, this.edges, this.context); + return new Graph<K, VV, EV>(newVertices, edges, context); } + /** + * Creates a graph from the given vertex and edge collections + * @param context the flink execution environment. + * @param v the collection of vertices + * @param e the collection of edges + * @return a new graph formed from the set of edges and vertices + */ + public static <K extends Comparable<K> & Serializable, VV extends Serializable, + EV extends Serializable> Graph<K, VV, EV> + fromCollection(ExecutionEnvironment context, Collection<Vertex<K, VV>> v, + Collection<Edge<K, EV>> e) throws Exception { + + DataSet<Vertex<K, VV>> vertices = context.fromCollection(v); + DataSet<Edge<K, EV>> edges = context.fromCollection(e); + + return Graph.create(vertices, edges, context); + } + + /** + * Vertices may not have a value attached or may receive a value as a result of running the algorithm. + * @param context the flink execution environment. + * @param e the collection of edges + * @return a new graph formed from the edges, with no value for the vertices + */ + public static <K extends Comparable<K> & Serializable, VV extends Serializable, + EV extends Serializable> Graph<K, NullValue, EV> + fromCollection(ExecutionEnvironment context, Collection<Edge<K, EV>> e) { + + DataSet<Edge<K, EV>> edges = context.fromCollection(e); + + return Graph.create(edges, context); + } + + /** + * Vertices may have an initial value defined by a function. + * @param context the flink execution environment. + * @param e the collection of edges + * @return a new graph formed from the edges, with a custom value for the vertices, + * determined by the mapping function + */ + public static <K extends Comparable<K> & Serializable, VV extends Serializable, + EV extends Serializable> Graph<K, VV, EV> + fromCollection(ExecutionEnvironment context, Collection<Edge<K, EV>> e, + final MapFunction<K, VV> mapper) { + + DataSet<Edge<K, EV>> edges = context.fromCollection(e); + return Graph.create(edges, mapper, context); + } + public Graph<K, VV, EV> run (GraphAlgorithm<K, VV, EV> algorithm) { return algorithm.run(this); }