[FLINK-1201] [gelly] Updated JavaDoc for Graph class
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b511932d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b511932d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b511932d Branch: refs/heads/master Commit: b511932d795f5dd8e20a1cf13143aa149c89c368 Parents: 1d6a200 Author: Carsten Brandt <m...@cebe.cc> Authored: Thu Jan 15 19:53:23 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Feb 11 10:46:14 2015 +0100 ---------------------------------------------------------------------- .../main/java/org/apache/flink/graph/Graph.java | 305 +++++++++++-------- 1 file changed, 180 insertions(+), 125 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b511932d/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index fb72a20..646c747 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -56,6 +56,17 @@ import flink.graphs.utils.GraphUtils; import flink.graphs.utils.Tuple2ToVertexMap; import flink.graphs.validation.GraphValidator; +/** + * Represents a Graph consisting of {@link Edge edges} and {@link Vertex vertices}. + * + * + * @see flink.graphs.Edge + * @see flink.graphs.Vertex + * + * @param <K> the key type for edge and vertex identifiers + * @param <VV> the value type for vertexes + * @param <EV> the value type for edges + */ @SuppressWarnings("serial") public class Graph<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> { @@ -64,12 +75,27 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab private final DataSet<Edge<K, EV>> edges; private boolean isUndirected; + /** + * Creates a graph from two datasets: vertices and edges + * + * @param vertices a DataSet of vertices. + * @param edges a DataSet of vertices. + * @param context the flink execution environment. + */ public Graph(DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) { /** a graph is directed by default */ this(vertices, edges, context, false); } + /** + * Creates a graph from two datasets: vertices and edges and allow setting the undirected property + * + * @param vertices a DataSet of vertices. + * @param edges a DataSet of vertices. + * @param context the flink execution environment. + * @param undirected whether this is an undirected graph + */ public Graph(DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>> edges, ExecutionEnvironment context, boolean undirected) { this.vertices = vertices; @@ -78,29 +104,38 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab this.isUndirected = undirected; } + /** + * @return the flink execution environment. + */ public ExecutionEnvironment getContext() { return this.context; } /** * Function that checks whether a graph's ids are valid - * @return - */ + * @return true if the graph's ids are valid, false if not. + */ public DataSet<Boolean> validate(GraphValidator<K, VV, EV> validator) { return validator.validate(this); } + /** + * @return the vertex dataset. + */ public DataSet<Vertex<K, VV>> getVertices() { return vertices; } + /** + * @return the edge dataset. + */ public DataSet<Edge<K, EV>> getEdges() { return edges; } /** * Apply a function to the attribute of each vertex in the graph. - * @param mapper + * @param mapper the map function to apply. * @return a new graph */ @SuppressWarnings({ "unchecked", "rawtypes" }) @@ -124,11 +159,11 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab return new Graph<K, NV, EV>(mappedVertices, this.getEdges(), this.context); } - + /** * Apply a function to the attribute of each edge in the graph. - * @param mapper - * @return + * @param mapper the map function to apply. + * @return a new graph */ @SuppressWarnings({ "unchecked", "rawtypes" }) public <NV extends Serializable> Graph<K, VV, NV> mapEdges(final MapFunction<Edge<K, EV>, NV> mapper) { @@ -152,10 +187,10 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } /** - * Method that joins the vertex DataSet with an input DataSet and applies a UDF on the resulted values. - * @param inputDataSet - * @param mapper - the UDF applied - * @return - a new graph where the vertex values have been updated. + * Joins the vertex DataSet of this graph with an input DataSet and applies a UDF on the resulted values. + * @param inputDataSet the DataSet to join with. + * @param mapper the UDF map function to apply. + * @return a new graph where the vertex values have been updated. */ public <T> Graph<K, VV, EV> joinWithVertices(DataSet<Tuple2<K, T>> inputDataSet, final MapFunction<Tuple2<VV, T>, VV> mapper) { @@ -196,12 +231,12 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } /** - * Method that joins the edge DataSet with an input DataSet on a composite key of both source and target + * Joins the edge DataSet with an input DataSet on a composite key of both source and target * and applies a UDF on the resulted values. - * @param inputDataSet - * @param mapper - the UDF applied + * @param inputDataSet the DataSet to join with. + * @param mapper the UDF map function to apply. * @param <T> - * @return - a new graph where the edge values have been updated. + * @return a new graph where the edge values have been updated. */ public <T> Graph<K, VV, EV> joinWithEdges(DataSet<Tuple3<K, K, T>> inputDataSet, final MapFunction<Tuple2<EV, T>, EV> mapper) { @@ -243,13 +278,13 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } /** - * Method that joins the edge DataSet with an input DataSet on the source key of the edges and the first attribute + * Joins the edge DataSet with an input DataSet on the source key of the edges and the first attribute * of the input DataSet and applies a UDF on the resulted values. - * Should the inputDataSet contain the same key more than once, only the first value will be considered. - * @param inputDataSet - * @param mapper - the UDF applied + * In case the inputDataSet contains the same key more than once, only the first value will be considered. + * @param inputDataSet the DataSet to join with. + * @param mapper the UDF map function to apply. * @param <T> - * @return - a new graph where the edge values have been updated. + * @return a new graph where the edge values have been updated. */ public <T> Graph<K, VV, EV> joinWithEdgesOnSource(DataSet<Tuple2<K, T>> inputDataSet, final MapFunction<Tuple2<EV, T>, EV> mapper) { @@ -295,13 +330,13 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } /** - * Method that joins the edge DataSet with an input DataSet on the target key of the edges and the first attribute + * Joins the edge DataSet with an input DataSet on the target key of the edges and the first attribute * of the input DataSet and applies a UDF on the resulted values. * Should the inputDataSet contain the same key more than once, only the first value will be considered. - * @param inputDataSet - * @param mapper - the UDF applied + * @param inputDataSet the DataSet to join with. + * @param mapper the UDF map function to apply. * @param <T> - * @return - a new graph where the edge values have been updated. + * @return a new graph where the edge values have been updated. */ public <T> Graph<K, VV, EV> joinWithEdgesOnTarget(DataSet<Tuple2<K, T>> inputDataSet, final MapFunction<Tuple2<EV, T>, EV> mapper) { @@ -314,14 +349,14 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } /** - * Apply value-based filtering functions to the graph - * and return a sub-graph that satisfies the predicates - * for both vertex values and edge values. - * @param vertexFilter - * @param edgeFilter - * @return - */ - public Graph<K, VV, EV> subgraph(FilterFunction<Vertex<K, VV>> vertexFilter, FilterFunction<Edge<K, EV>> edgeFilter) { + * Apply filtering functions to the graph + * and return a sub-graph that satisfies the predicates + * for both vertices and edges. + * @param vertexFilter the filter function for vertices. + * @param edgeFilter the filter function for edges. + * @return the resulting sub-graph. + */ + public Graph<K, VV, EV> subgraph(FilterFunction<Vertex<K, VV>> vertexFilter, FilterFunction<Edge<K, EV>> edgeFilter) { DataSet<Vertex<K, VV>> filteredVertices = this.vertices.filter(vertexFilter); @@ -337,11 +372,11 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } /** - * Apply value-based filtering functions to the graph + * Apply a filtering function to the graph * and return a sub-graph that satisfies the predicates * only for the vertices. - * @param vertexFilter - * @return + * @param vertexFilter the filter function for vertices. + * @return the resulting sub-graph. */ public Graph<K, VV, EV> filterOnVertices(FilterFunction<Vertex<K, VV>> vertexFilter) { @@ -357,18 +392,18 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } /** - * Apply value-based filtering functions to the graph + * Apply a filtering function to the graph * and return a sub-graph that satisfies the predicates * only for the edges. - * @param edgeFilter - * @return + * @param edgeFilter the filter function for edges. + * @return the resulting sub-graph. */ public Graph<K, VV, EV> filterOnEdges(FilterFunction<Edge<K, EV>> edgeFilter) { DataSet<Edge<K, EV>> filteredEdges = this.edges.filter(edgeFilter); return new Graph<K, VV, EV>(this.vertices, filteredEdges, this.context); } - + @ConstantFieldsFirst("0->0;1->1;2->2") private static final class ProjectEdge<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> implements FlatJoinFunction<Edge<K,EV>, Vertex<K,VV>, @@ -424,7 +459,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab /** * Convert the directed graph into an undirected graph * by adding all inverse-direction edges. - * + * @return the undirected graph. */ public Graph<K, VV, EV> getUndirected() throws UnsupportedOperationException { if (this.isUndirected) { @@ -644,9 +679,10 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab /** * Creates a graph from a dataset of vertices and a dataset of edges - * @param vertices - * @param edges - * @return + * @param vertices a DataSet of vertices. + * @param edges a DataSet of vertices. + * @param context the flink execution environment. + * @return the newly created graph */ public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> Graph<K, VV, EV> @@ -658,8 +694,9 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab /** * Creates a graph from a DataSet of edges. * Vertices are created automatically and their values are set to NullValue. - * @param edges - * @return + * @param edges a DataSet of vertices. + * @param context the flink execution environment. + * @return the newly created graph */ public static <K extends Comparable<K> & Serializable, EV extends Serializable> Graph<K, NullValue, EV> create(DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) { @@ -674,7 +711,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab * by applying the provided map function to the vertex ids. * @param edges the input edges * @param mapper the map function to set the initial vertex value - * @return + * @return the newly created graph */ public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> Graph<K, VV, EV> create(DataSet<Edge<K, EV>> edges, final MapFunction<K, VV> mapper, @@ -718,19 +755,32 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } /** - * Read and create the graph Tuple2 dataset from a csv file - * @param env - * @param filePath - * @param delimiter - * @param Tuple2IdClass - * @param Tuple2ValueClass - * @return + * Read and create the graph vertex Tuple2 DataSet from a csv file + * + * The CSV file should be of the following format: + * + * <vertexID><delimiter><vertexValue> + * + * For example, with space delimiter: + * + * 1 57 + * 2 45 + * 3 77 + * 4 12 + * + * @param context the flink execution environment. + * @param filePath the path to the CSV file. + * @param delimiter the CSV delimiter. + * @param Tuple2IdClass The class to use for Vertex IDs + * @param Tuple2ValueClass The class to use for Vertex Values + * @return a set of vertices and their values. */ public static <K extends Comparable<K> & Serializable, VV extends Serializable> - DataSet<Tuple2<K, VV>> readTuple2CsvFile(ExecutionEnvironment env, String filePath, + DataSet<Tuple2<K, VV>> + readTuple2CsvFile(ExecutionEnvironment context, String filePath, char delimiter, Class<K> Tuple2IdClass, Class<VV> Tuple2ValueClass) { - CsvReader reader = new CsvReader(filePath, env); + CsvReader reader = new CsvReader(filePath, context); DataSet<Tuple2<K, VV>> vertices = reader.fieldDelimiter(delimiter).types(Tuple2IdClass, Tuple2ValueClass) .map(new MapFunction<Tuple2<K, VV>, Tuple2<K, VV>>() { @@ -741,23 +791,21 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab return vertices; } - /** - * @return Singleton DataSet containing the vertex count - */ + /** + * @return Singleton DataSet containing the vertex count + */ public DataSet<Integer> numberOfVertices () { return GraphUtils.count(vertices, context); } - /** - * - * @return Singleton DataSet containing the edge count - */ + /** + * @return Singleton DataSet containing the edge count + */ public DataSet<Integer> numberOfEdges () { return GraphUtils.count(edges, context); } /** - * * @return The IDs of the vertices as DataSet */ public DataSet<K> getVertexIds () { @@ -772,6 +820,9 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } } + /** + * @return The IDs of the edges as DataSet + */ public DataSet<Tuple2<K, K>> getEdgeIds () { return edges.map(new ExtractEdgeIDsMapper<K, EV>()); } @@ -784,11 +835,11 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } } - /** - * Checks the weak connectivity of a graph. - * @param maxIterations the maximum number of iterations for the inner delta iteration - * @return true if the graph is weakly connected. - */ + /** + * Checks the weak connectivity of a graph. + * @param maxIterations the maximum number of iterations for the inner delta iteration + * @return true if the graph is weakly connected. + */ public DataSet<Boolean> isWeaklyConnected (int maxIterations) { Graph<K, VV, EV> graph; @@ -870,17 +921,17 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab DataSet<Edge<K, EV>> e = context.fromCollection(edges); return new Graph<K, VV, EV>(v, e, context); - } + } - /** - * Adds the input vertex and edges to the graph. - * If the vertex already exists in the graph, it will not be added again, - * but the given edges will. - * @param vertex - * @param edges - * @return - */ - @SuppressWarnings("unchecked") + /** + * Adds the input vertex and edges to the graph. + * If the vertex already exists in the graph, it will not be added again, + * but the given edges will. + * @param vertex the vertex to add to the graph + * @param edges a list of edges to add to the grap + * @return the new graph containing the existing and newly added vertices and edges + */ + @SuppressWarnings("unchecked") public Graph<K, VV, EV> addVertex (final Vertex<K,VV> vertex, List<Edge<K, EV>> edges) { DataSet<Vertex<K, VV>> newVertex = this.context.fromElements(vertex); @@ -896,14 +947,14 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab return Graph.create(newVertices, newEdges, context); } - /** + /** * Adds the given edge to the graph. * If the source and target vertices do not exist in the graph, * they will also be added. - * @param source - * @param target - * @param edgeValue - * @return + * @param source the source vertex of the edge + * @param target the target vertex of the edge + * @param edgeValue the edge value + * @return the new graph containing the existing vertices and edges plus the newly added edge */ public Graph<K, VV, EV> addEdge (Vertex<K,VV> source, Vertex<K,VV> target, EV edgeValue) { Graph<K,VV,EV> partialGraph = this.fromCollection(Arrays.asList(source, target), @@ -911,12 +962,13 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab return this.union(partialGraph); } - /** - * Removes the given vertex and its edges from the graph. - * @param vertex - * @return - */ - public Graph<K, VV, EV> removeVertex (Vertex<K,VV> vertex) { + /** + * Removes the given vertex and its edges from the graph. + * @param vertex the vertex to remove + * @return the new graph containing the existing vertices and edges without the removed vertex and its edges + */ + public Graph<K, VV, EV> removeVertex (Vertex<K,VV> vertex) { + DataSet<Vertex<K, VV>> newVertices = getVertices().filter( new RemoveVertexFilter<K, VV>(vertex)); DataSet<Edge<K, EV>> newEdges = getEdges().filter( @@ -960,11 +1012,11 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab return true; } } - + /** * Removes all edges that match the given edge from the graph. - * @param edge - * @return + * @param edge the edge to remove + * @return the new graph containing the existing vertices and edges without the removed edges */ public Graph<K, VV, EV> removeEdge (Edge<K, EV> edge) { DataSet<Edge<K, EV>> newEdges = getEdges().filter( @@ -990,8 +1042,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab /** * Performs union on the vertices and edges sets of the input graphs * removing duplicate vertices but maintaining duplicate edges. - * @param graph - * @return + * @param graph the graph to perform union with + * @return a new graph */ public Graph<K, VV, EV> union (Graph<K, VV, EV> graph) { DataSet<Vertex<K,VV>> unionedVertices = graph.getVertices().union(this.getVertices()).distinct(); @@ -999,14 +1051,14 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab return new Graph<K,VV,EV>(unionedVertices, unionedEdges, this.context); } - /** - * Runs a Vertex-Centric iteration on the graph. - * @param vertexUpdateFunction - * @param messagingFunction - * @param maximumNumberOfIterations - * @return - */ - @SuppressWarnings("unchecked") + /** + * Runs a Vertex-Centric iteration on the graph. + * @param vertexUpdateFunction the vertex update function + * @param messagingFunction the messaging function + * @param maximumNumberOfIterations maximum number of iterations to perform + * @return + */ + @SuppressWarnings("unchecked") public <M>Graph<K, VV, EV> runVertexCentricIteration(VertexUpdateFunction<K, VV, M> vertexUpdateFunction, MessagingFunction<K, VV, M, EV> messagingFunction, int maximumNumberOfIterations) { DataSet<Tuple2<K, VV>> tupleVertices = (DataSet<Tuple2<K, VV>>) (DataSet<?>) vertices; @@ -1018,49 +1070,52 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } /** - * Creates a graph from the given vertex and edge collections - * @param env - * @param v the collection of vertices - * @param e the collection of edges - * @return a new graph formed from the set of edges and vertices - */ - public static <K extends Comparable<K> & Serializable, VV extends Serializable, - EV extends Serializable> Graph<K, VV, EV> fromCollection(ExecutionEnvironment env, - Collection<Vertex<K, VV>> v, Collection<Edge<K, EV>> e) throws Exception { - DataSet<Vertex<K, VV>> vertices = env.fromCollection(v); - DataSet<Edge<K, EV>> edges = env.fromCollection(e); - - return Graph.create(vertices, edges, env); + * Creates a graph from the given vertex and edge collections + * @param context the flink execution environment. + * @param v the collection of vertices + * @param e the collection of edges + * @return a new graph formed from the set of edges and vertices + */ + public static <K extends Comparable<K> & Serializable, VV extends Serializable, + EV extends Serializable> Graph<K, VV, EV> + fromCollection(ExecutionEnvironment context, Collection<Vertex<K, VV>> v, + Collection<Edge<K, EV>> e) throws Exception { + + DataSet<Vertex<K, VV>> vertices = context.fromCollection(v); + DataSet<Edge<K, EV>> edges = context.fromCollection(e); + + return Graph.create(vertices, edges, context); } /** * Vertices may not have a value attached or may receive a value as a result of running the algorithm. - * @param env + * @param context the flink execution environment. * @param e the collection of edges * @return a new graph formed from the edges, with no value for the vertices */ public static <K extends Comparable<K> & Serializable, VV extends Serializable, - EV extends Serializable> Graph<K, NullValue, EV> fromCollection(ExecutionEnvironment env, - Collection<Edge<K, EV>> e) { + EV extends Serializable> Graph<K, NullValue, EV> + fromCollection(ExecutionEnvironment context, Collection<Edge<K, EV>> e) { - DataSet<Edge<K, EV>> edges = env.fromCollection(e); + DataSet<Edge<K, EV>> edges = context.fromCollection(e); - return Graph.create(edges, env); + return Graph.create(edges, context); } /** * Vertices may have an initial value defined by a function. - * @param env + * @param context the flink execution environment. * @param e the collection of edges * @return a new graph formed from the edges, with a custom value for the vertices, * determined by the mapping function */ public static <K extends Comparable<K> & Serializable, VV extends Serializable, - EV extends Serializable> Graph<K, VV, EV> fromCollection(ExecutionEnvironment env, - Collection<Edge<K, EV>> e, - final MapFunction<K, VV> mapper) { - DataSet<Edge<K, EV>> edges = env.fromCollection(e); - return Graph.create(edges, mapper, env); + EV extends Serializable> Graph<K, VV, EV> + fromCollection(ExecutionEnvironment context, Collection<Edge<K, EV>> e, + final MapFunction<K, VV> mapper) { + + DataSet<Edge<K, EV>> edges = context.fromCollection(e); + return Graph.create(edges, mapper, context); } public Graph<K, VV, EV> run (GraphAlgorithm<K, VV, EV> algorithm) {