http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index a236478..59920a6 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package flink.graphs; +package org.apache.flink.graph; import java.io.Serializable; import java.util.Arrays; @@ -45,122 +45,128 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.VertexCentricIteration; +import org.apache.flink.graph.spargel.VertexUpdateFunction; +import org.apache.flink.graph.utils.EdgeToTuple3Map; +import org.apache.flink.graph.utils.GraphUtils; +import org.apache.flink.graph.utils.Tuple2ToVertexMap; +import org.apache.flink.graph.utils.Tuple3ToEdgeMap; +import org.apache.flink.graph.utils.VertexToTuple2Map; +import org.apache.flink.graph.validation.GraphValidator; import org.apache.flink.util.Collector; import org.apache.flink.types.NullValue; -import flink.graphs.spargel.MessagingFunction; -import flink.graphs.spargel.VertexCentricIteration; -import flink.graphs.spargel.VertexUpdateFunction; -import flink.graphs.utils.GraphUtils; -import flink.graphs.utils.Tuple2ToVertexMap; -import flink.graphs.utils.Tuple3ToEdgeMap; -import flink.graphs.validation.GraphValidator; - /** - * Represents a Graph consisting of {@link Edge edges} and {@link Vertex vertices}. - * - * - * @see flink.graphs.Edge - * @see flink.graphs.Vertex - * + * Represents a Graph consisting of {@link Edge edges} and {@link Vertex + * vertices}. + * + * + * @see org.apache.flink.graph.Edge + * @see org.apache.flink.graph.Vertex + * * @param <K> the key type for edge and vertex identifiers * @param <VV> the value type for vertexes * @param <EV> the value type for edges */ @SuppressWarnings("serial") -public class Graph<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> { +public class Graph<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> { - private final ExecutionEnvironment context; + private final ExecutionEnvironment context; private final DataSet<Vertex<K, VV>> vertices; private final DataSet<Edge<K, EV>> edges; /** - * Creates a graph from two DataSets: vertices and edges and allow setting the undirected property - * + * Creates a graph from two DataSets: vertices and edges and allow setting + * the undirected property + * * @param vertices a DataSet of vertices. - * @param edges a DataSet of vertices. + * @param edges a DataSet of edges. * @param context the flink execution environment. */ private Graph(DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) { this.vertices = vertices; this.edges = edges; - this.context = context; + this.context = context; } /** * Creates a graph from a Collection of vertices and a Collection of edges. + * * @param vertices a Collection of vertices. - * @param edges a Collection of vertices. + * @param edges a Collection of edges. * @param context the flink execution environment. * @return the newly created graph. */ - public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> - Graph<K, VV, EV> fromCollection (Collection<Vertex<K,VV>> vertices, - Collection<Edge<K,EV>> edges, - ExecutionEnvironment context) { + public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> Graph<K, VV, EV> fromCollection( + Collection<Vertex<K, VV>> vertices, Collection<Edge<K, EV>> edges, + ExecutionEnvironment context) { - return fromDataSet(context.fromCollection(vertices), context.fromCollection(edges), context); + return fromDataSet(context.fromCollection(vertices), + context.fromCollection(edges), context); } /** - * Creates a graph from a Collection of edges, vertices are induced from the edges. - * Vertices are created automatically and their values are set to NullValue. + * Creates a graph from a Collection of edges, vertices are induced from the + * edges. Vertices are created automatically and their values are set to + * NullValue. + * * @param edges a Collection of vertices. * @param context the flink execution environment. * @return the newly created graph. */ - public static <K extends Comparable<K> & Serializable, EV extends Serializable> - Graph<K, NullValue, EV> fromCollection (Collection<Edge<K,EV>> edges, ExecutionEnvironment context) { + public static <K extends Comparable<K> & Serializable, EV extends Serializable> Graph<K, NullValue, EV> fromCollection( + Collection<Edge<K, EV>> edges, ExecutionEnvironment context) { return fromDataSet(context.fromCollection(edges), context); } /** - * Creates a graph from a Collection of edges, vertices are induced from the edges and - * vertex values are calculated by a mapper function. - * Vertices are created automatically and their values are set - * by applying the provided map function to the vertex ids. - * @param edges a Collection of vertices. + * Creates a graph from a Collection of edges, vertices are induced from the + * edges and vertex values are calculated by a mapper function. Vertices are + * created automatically and their values are set by applying the provided + * map function to the vertex ids. + * + * @param edges a Collection of edges. * @param mapper the mapper function. * @param context the flink execution environment. * @return the newly created graph. */ - public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> - Graph<K, VV, EV> fromCollection (Collection<Edge<K,EV>> edges, - final MapFunction<K, VV> mapper, - ExecutionEnvironment context) { + public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> Graph<K, VV, EV> fromCollection( + Collection<Edge<K, EV>> edges, final MapFunction<K, VV> mapper, + ExecutionEnvironment context) { return fromDataSet(context.fromCollection(edges), mapper, context); } /** * Creates a graph from a DataSet of vertices and a DataSet of edges. + * * @param vertices a DataSet of vertices. - * @param edges a DataSet of vertices. + * @param edges a DataSet of edges. * @param context the flink execution environment. * @return the newly created graph. */ - public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> - Graph<K, VV, EV> fromDataSet (DataSet<Vertex<K,VV>> vertices, - DataSet<Edge<K,EV>> edges, - ExecutionEnvironment context) { + public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> Graph<K, VV, EV> fromDataSet( + DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>> edges, + ExecutionEnvironment context) { return new Graph<K, VV, EV>(vertices, edges, context); } /** - * Creates a graph from a DataSet of edges, vertices are induced from the edges. - * Vertices are created automatically and their values are set to NullValue. - * @param edges a DataSet of vertices. + * Creates a graph from a DataSet of edges, vertices are induced from the + * edges. Vertices are created automatically and their values are set to + * NullValue. + * + * @param edges a DataSet of edges. * @param context the flink execution environment. * @return the newly created graph. */ - public static <K extends Comparable<K> & Serializable, EV extends Serializable> - Graph<K, NullValue, EV> fromDataSet (DataSet<Edge<K,EV>> edges, - ExecutionEnvironment context) { + public static <K extends Comparable<K> & Serializable, EV extends Serializable> Graph<K, NullValue, EV> fromDataSet( + DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) { - DataSet<Vertex<K, NullValue>> vertices = - edges.flatMap(new EmitSrcAndTarget<K, EV>()).distinct(); + DataSet<Vertex<K, NullValue>> vertices = edges.flatMap(new EmitSrcAndTarget<K, EV>()).distinct(); return new Graph<K, NullValue, EV>(vertices, edges, context); } @@ -175,44 +181,41 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } /** - * Creates a graph from a DataSet of edges, vertices are induced from the edges and - * vertex values are calculated by a mapper function. - * Vertices are created automatically and their values are set - * by applying the provided map function to the vertex ids. - * @param edges a DataSet of vertices. + * Creates a graph from a DataSet of edges, vertices are induced from the + * edges and vertex values are calculated by a mapper function. Vertices are + * created automatically and their values are set by applying the provided + * map function to the vertex ids. + * + * @param edges a DataSet of edges. * @param mapper the mapper function. * @param context the flink execution environment. * @return the newly created graph. */ - public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> - Graph<K, VV, EV> fromDataSet (DataSet<Edge<K,EV>> edges, - final MapFunction<K, VV> mapper, - ExecutionEnvironment context) { + public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> Graph<K, VV, EV> fromDataSet( + DataSet<Edge<K, EV>> edges, final MapFunction<K, VV> mapper, ExecutionEnvironment context) { TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0); - TypeInformation<VV> valueType = TypeExtractor - .createTypeInfo(MapFunction.class, mapper.getClass(), 1, null, null); + TypeInformation<VV> valueType = TypeExtractor.createTypeInfo( + MapFunction.class, mapper.getClass(), 1, null, null); @SuppressWarnings({ "unchecked", "rawtypes" }) - TypeInformation<Vertex<K, VV>> returnType = (TypeInformation<Vertex<K, VV>>) - new TupleTypeInfo(Vertex.class, keyType, valueType); - - DataSet<Vertex<K, VV>> vertices = - edges.flatMap(new EmitSrcAndTargetAsTuple1<K, EV>()) - .distinct() - .map(new MapFunction<Tuple1<K>, Vertex<K, VV>>() { - public Vertex<K, VV> map(Tuple1<K> value) throws Exception { - return new Vertex<K, VV>(value.f0, mapper.map(value.f0)); - } - }) - .returns(returnType); + TypeInformation<Vertex<K, VV>> returnType = (TypeInformation<Vertex<K, VV>>) new TupleTypeInfo( + Vertex.class, keyType, valueType); + + DataSet<Vertex<K, VV>> vertices = edges + .flatMap(new EmitSrcAndTargetAsTuple1<K, EV>()).distinct() + .map(new MapFunction<Tuple1<K>, Vertex<K, VV>>() { + public Vertex<K, VV> map(Tuple1<K> value) throws Exception { + return new Vertex<K, VV>(value.f0, mapper.map(value.f0)); + } + }).returns(returnType); return new Graph<K, VV, EV>(vertices, edges, context); } - private static final class EmitSrcAndTargetAsTuple1<K extends Comparable<K> & Serializable, - EV extends Serializable> implements FlatMapFunction<Edge<K, EV>, Tuple1<K>> { + private static final class EmitSrcAndTargetAsTuple1<K extends Comparable<K> & Serializable, EV extends Serializable> + implements FlatMapFunction<Edge<K, EV>, Tuple1<K>> { public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) { out.collect(new Tuple1<K>(edge.f0)); @@ -222,19 +225,17 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab /** * Creates a graph from a DataSet of Tuple objects for vertices and edges. - * - * Vertices with value are created from Tuple2, - * Edges with value are created from Tuple3. - * - * @param vertices a DataSet of vertices. - * @param edges a DataSet of vertices. + * + * Vertices with value are created from Tuple2, Edges with value are created + * from Tuple3. + * + * @param vertices a DataSet of Tuple2. + * @param edges a DataSet of Tuple3. * @param context the flink execution environment. * @return the newly created graph. */ - public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> - Graph<K, VV, EV> fromTupleDataSet (DataSet<Tuple2<K, VV>> vertices, - DataSet<Tuple3<K, K, EV>> edges, - ExecutionEnvironment context) { + public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> Graph<K, VV, EV> fromTupleDataSet( + DataSet<Tuple2<K, VV>> vertices, DataSet<Tuple3<K, K, EV>> edges, ExecutionEnvironment context) { DataSet<Vertex<K, VV>> vertexDataSet = vertices.map(new Tuple2ToVertexMap<K, VV>()); DataSet<Edge<K, EV>> edgeDataSet = edges.map(new Tuple3ToEdgeMap<K, EV>()); @@ -242,38 +243,37 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } /** - * Creates a graph from a DataSet of Tuple objects for edges, vertices are induced from the edges. - * - * Edges with value are created from Tuple3. - * Vertices are created automatically and their values are set to NullValue. - * - * @param edges a DataSet of vertices. + * Creates a graph from a DataSet of Tuple objects for edges, vertices are + * induced from the edges. + * + * Edges with value are created from Tuple3. Vertices are created + * automatically and their values are set to NullValue. + * + * @param edges a DataSet of Tuple3. * @param context the flink execution environment. * @return the newly created graph. */ - public static <K extends Comparable<K> & Serializable, EV extends Serializable> - Graph<K, NullValue, EV> fromTupleDataSet (DataSet<Tuple3<K, K, EV>> edges, - ExecutionEnvironment context) { + public static <K extends Comparable<K> & Serializable, EV extends Serializable> Graph<K, NullValue, EV> fromTupleDataSet( + DataSet<Tuple3<K, K, EV>> edges, ExecutionEnvironment context) { DataSet<Edge<K, EV>> edgeDataSet = edges.map(new Tuple3ToEdgeMap<K, EV>()); return fromDataSet(edgeDataSet, context); } /** - * Creates a graph from a DataSet of Tuple objects for edges, vertices are induced from the edges and - * vertex values are calculated by a mapper function. - * Edges with value are created from Tuple3. - * Vertices are created automatically and their values are set - * by applying the provided map function to the vertex ids. - * @param edges a DataSet of vertices. + * Creates a graph from a DataSet of Tuple objects for edges, vertices are + * induced from the edges and vertex values are calculated by a mapper + * function. Edges with value are created from Tuple3. Vertices are created + * automatically and their values are set by applying the provided map + * function to the vertex ids. + * + * @param edges a DataSet of Tuple3. * @param mapper the mapper function. * @param context the flink execution environment. * @return the newly created graph. */ - public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> - Graph<K, VV, EV> fromTupleDataSet (DataSet<Tuple3<K, K, EV>> edges, - final MapFunction<K, VV> mapper, - ExecutionEnvironment context) { + public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> Graph<K, VV, EV> fromTupleDataSet( + DataSet<Tuple3<K, K, EV>> edges, final MapFunction<K, VV> mapper, ExecutionEnvironment context) { DataSet<Edge<K, EV>> edgeDataSet = edges.map(new Tuple3ToEdgeMap<K, EV>()); return fromDataSet(edgeDataSet, mapper, context); @@ -288,6 +288,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab /** * Function that checks whether a graph's ids are valid + * * @return true if the graph's ids are valid, false if not. */ public DataSet<Boolean> validate(GraphValidator<K, VV, EV> validator) { @@ -311,79 +312,79 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab /** * @return the vertex DataSet as Tuple2. */ - @SuppressWarnings({ "unchecked" }) public DataSet<Tuple2<K, VV>> getVerticesAsTuple2() { - return (DataSet<Tuple2<K, VV>>) (DataSet<?>) vertices; + return vertices.map(new VertexToTuple2Map<K, VV>()); } /** * @return the edge DataSet as Tuple3. */ - @SuppressWarnings({ "unchecked" }) public DataSet<Tuple3<K, K, EV>> getEdgesAsTuple3() { - return (DataSet<Tuple3<K, K, EV>>) (DataSet<?>) edges; + return edges.map(new EdgeToTuple3Map<K, EV>()); } - /** - * Apply a function to the attribute of each vertex in the graph. - * @param mapper the map function to apply. - * @return a new graph - */ - @SuppressWarnings({ "unchecked", "rawtypes" }) + /** + * Apply a function to the attribute of each vertex in the graph. + * + * @param mapper the map function to apply. + * @return a new graph + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) public <NV extends Serializable> Graph<K, NV, EV> mapVertices(final MapFunction<Vertex<K, VV>, NV> mapper) { - TypeInformation<K> keyType = ((TupleTypeInfo<?>) vertices.getType()).getTypeAt(0); + TypeInformation<K> keyType = ((TupleTypeInfo<?>) vertices.getType()).getTypeAt(0); - TypeInformation<NV> valueType = TypeExtractor - .createTypeInfo(MapFunction.class, mapper.getClass(), 1, null, null); + TypeInformation<NV> valueType = TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), 1, null, null); - TypeInformation<Vertex<K, NV>> returnType = (TypeInformation<Vertex<K, NV>>) - new TupleTypeInfo(Vertex.class, keyType, valueType); + TypeInformation<Vertex<K, NV>> returnType = (TypeInformation<Vertex<K, NV>>) new TupleTypeInfo( + Vertex.class, keyType, valueType); - DataSet<Vertex<K, NV>> mappedVertices = vertices - .map(new MapFunction<Vertex<K,VV>, Vertex<K, NV>>() { - public Vertex<K, NV> map(Vertex<K, VV> value) throws Exception { - return new Vertex<K, NV>(value.f0, mapper.map(value)); - } - }) - .returns(returnType); + DataSet<Vertex<K, NV>> mappedVertices = vertices.map( + new MapFunction<Vertex<K, VV>, Vertex<K, NV>>() { + public Vertex<K, NV> map(Vertex<K, VV> value) throws Exception { + return new Vertex<K, NV>(value.f0, mapper.map(value)); + } + }).returns(returnType); - return new Graph<K, NV, EV>(mappedVertices, this.edges, this.context); - } + return new Graph<K, NV, EV>(mappedVertices, this.edges, this.context); + } - /** - * Apply a function to the attribute of each edge in the graph. - * @param mapper the map function to apply. - * @return a new graph - */ - @SuppressWarnings({ "unchecked", "rawtypes" }) + /** + * Apply a function to the attribute of each edge in the graph. + * + * @param mapper the map function to apply. + * @return a new graph + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) public <NV extends Serializable> Graph<K, VV, NV> mapEdges(final MapFunction<Edge<K, EV>, NV> mapper) { - TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0); + TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0); - TypeInformation<NV> valueType = TypeExtractor - .createTypeInfo(MapFunction.class, mapper.getClass(), 1, null, null); + TypeInformation<NV> valueType = TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), 1, null, null); - TypeInformation<Edge<K, NV>> returnType = (TypeInformation<Edge<K, NV>>) - new TupleTypeInfo(Edge.class, keyType, keyType, valueType); + TypeInformation<Edge<K, NV>> returnType = (TypeInformation<Edge<K, NV>>) new TupleTypeInfo( + Edge.class, keyType, keyType, valueType); - DataSet<Edge<K, NV>> mappedEdges = edges.map(new MapFunction<Edge<K, EV>, Edge<K, NV>>() { - public Edge<K, NV> map(Edge<K, EV> value) throws Exception { - return new Edge<K, NV>(value.f0, value.f1, mapper.map(value)); - } - }) - .returns(returnType); + DataSet<Edge<K, NV>> mappedEdges = edges.map( + new MapFunction<Edge<K, EV>, Edge<K, NV>>() { + public Edge<K, NV> map(Edge<K, EV> value) throws Exception { + return new Edge<K, NV>(value.f0, value.f1, mapper + .map(value)); + } + }).returns(returnType); - return new Graph<K, VV, NV>(this.vertices, mappedEdges, this.context); - } + return new Graph<K, VV, NV>(this.vertices, mappedEdges, this.context); + } /** - * Joins the vertex DataSet of this graph with an input DataSet and applies a UDF on the resulted values. + * Joins the vertex DataSet of this graph with an input DataSet and applies + * a UDF on the resulted values. + * * @param inputDataSet the DataSet to join with. * @param mapper the UDF map function to apply. * @return a new graph where the vertex values have been updated. */ - public <T> Graph<K, VV, EV> joinWithVertices(DataSet<Tuple2<K, T>> inputDataSet, + public <T> Graph<K, VV, EV> joinWithVertices(DataSet<Tuple2<K, T>> inputDataSet, final MapFunction<Tuple2<VV, T>, VV> mapper) { DataSet<Vertex<K, VV>> resultedVertices = this.getVertices() @@ -392,75 +393,78 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab return new Graph<K, VV, EV>(resultedVertices, this.edges, this.context); } - private static final class ApplyCoGroupToVertexValues<K extends Comparable<K> & Serializable, - VV extends Serializable, T> implements CoGroupFunction<Vertex<K, VV>, Tuple2<K, T>, Vertex<K, VV>> { + private static final class ApplyCoGroupToVertexValues<K extends Comparable<K> & Serializable, VV extends Serializable, T> + implements CoGroupFunction<Vertex<K, VV>, Tuple2<K, T>, Vertex<K, VV>> { private MapFunction<Tuple2<VV, T>, VV> mapper; + public ApplyCoGroupToVertexValues(MapFunction<Tuple2<VV, T>, VV> mapper) { this.mapper = mapper; } @Override - public void coGroup(Iterable<Vertex<K, VV>> vertices, Iterable<Tuple2<K, T>> input, - Collector<Vertex<K, VV>> collector) throws Exception { + public void coGroup(Iterable<Vertex<K, VV>> vertices, + Iterable<Tuple2<K, T>> input, Collector<Vertex<K, VV>> collector) throws Exception { final Iterator<Vertex<K, VV>> vertexIterator = vertices.iterator(); final Iterator<Tuple2<K, T>> inputIterator = input.iterator(); if (vertexIterator.hasNext()) { - if(inputIterator.hasNext()) { + if (inputIterator.hasNext()) { final Tuple2<K, T> inputNext = inputIterator.next(); collector.collect(new Vertex<K, VV>(inputNext.f0, mapper - .map(new Tuple2<VV, T>(vertexIterator.next().f1, inputNext.f1)))); + .map(new Tuple2<VV, T>(vertexIterator.next().f1, + inputNext.f1)))); } else { collector.collect(vertexIterator.next()); } - + } } } /** - * Joins the edge DataSet with an input DataSet on a composite key of both source and target - * and applies a UDF on the resulted values. + * Joins the edge DataSet with an input DataSet on a composite key of both + * source and target and applies a UDF on the resulted values. + * * @param inputDataSet the DataSet to join with. * @param mapper the UDF map function to apply. - * @param <T> + * @param <T> the return type * @return a new graph where the edge values have been updated. */ - public <T> Graph<K, VV, EV> joinWithEdges(DataSet<Tuple3<K, K, T>> inputDataSet, + public <T> Graph<K, VV, EV> joinWithEdges(DataSet<Tuple3<K, K, T>> inputDataSet, final MapFunction<Tuple2<EV, T>, EV> mapper) { DataSet<Edge<K, EV>> resultedEdges = this.getEdges() - .coGroup(inputDataSet).where(0,1).equalTo(0,1) + .coGroup(inputDataSet).where(0, 1).equalTo(0, 1) .with(new ApplyCoGroupToEdgeValues<K, EV, T>(mapper)); return new Graph<K, VV, EV>(this.vertices, resultedEdges, this.context); } - private static final class ApplyCoGroupToEdgeValues<K extends Comparable<K> & Serializable, - EV extends Serializable, T> + private static final class ApplyCoGroupToEdgeValues<K extends Comparable<K> & Serializable, EV extends Serializable, T> implements CoGroupFunction<Edge<K, EV>, Tuple3<K, K, T>, Edge<K, EV>> { private MapFunction<Tuple2<EV, T>, EV> mapper; + public ApplyCoGroupToEdgeValues(MapFunction<Tuple2<EV, T>, EV> mapper) { this.mapper = mapper; } @Override - public void coGroup(Iterable<Edge<K, EV>> edges, - Iterable<Tuple3<K, K, T>> input, - Collector<Edge<K, EV>> collector) throws Exception { + public void coGroup(Iterable<Edge<K, EV>> edges, Iterable<Tuple3<K, K, T>> input, + Collector<Edge<K, EV>> collector) throws Exception { final Iterator<Edge<K, EV>> edgesIterator = edges.iterator(); final Iterator<Tuple3<K, K, T>> inputIterator = input.iterator(); if (edgesIterator.hasNext()) { - if(inputIterator.hasNext()) { + if (inputIterator.hasNext()) { final Tuple3<K, K, T> inputNext = inputIterator.next(); - collector.collect(new Edge<K, EV>(inputNext.f0, inputNext.f1, mapper - .map(new Tuple2<EV, T>(edgesIterator.next().f2, inputNext.f2)))); + collector.collect(new Edge<K, EV>(inputNext.f0, + inputNext.f1, mapper.map(new Tuple2<EV, T>( + edgesIterator.next().f2, inputNext.f2)))); } else { collector.collect(edgesIterator.next()); } @@ -469,12 +473,14 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } /** - * Joins the edge DataSet with an input DataSet on the source key of the edges and the first attribute - * of the input DataSet and applies a UDF on the resulted values. - * In case the inputDataSet contains the same key more than once, only the first value will be considered. + * Joins the edge DataSet with an input DataSet on the source key of the + * edges and the first attribute of the input DataSet and applies a UDF on + * the resulted values. In case the inputDataSet contains the same key more + * than once, only the first value will be considered. + * * @param inputDataSet the DataSet to join with. * @param mapper the UDF map function to apply. - * @param <T> + * @param <T> the return type * @return a new graph where the edge values have been updated. */ public <T> Graph<K, VV, EV> joinWithEdgesOnSource(DataSet<Tuple2<K, T>> inputDataSet, @@ -487,33 +493,36 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab return new Graph<K, VV, EV>(this.vertices, resultedEdges, this.context); } - private static final class ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K extends Comparable<K> & Serializable, - EV extends Serializable, T> implements CoGroupFunction<Edge<K, EV>, Tuple2<K, T>, Edge<K, EV>> { + private static final class ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K extends Comparable<K> & Serializable, EV extends Serializable, T> + implements CoGroupFunction<Edge<K, EV>, Tuple2<K, T>, Edge<K, EV>> { private MapFunction<Tuple2<EV, T>, EV> mapper; - public ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget(MapFunction<Tuple2<EV, T>, EV> mapper) { + + public ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget( + MapFunction<Tuple2<EV, T>, EV> mapper) { this.mapper = mapper; } @Override - public void coGroup(Iterable<Edge<K, EV>> edges, Iterable<Tuple2<K, T>> input, - Collector<Edge<K, EV>> collector) throws Exception { + public void coGroup(Iterable<Edge<K, EV>> edges, + Iterable<Tuple2<K, T>> input, Collector<Edge<K, EV>> collector) throws Exception { final Iterator<Edge<K, EV>> edgesIterator = edges.iterator(); final Iterator<Tuple2<K, T>> inputIterator = input.iterator(); - if(inputIterator.hasNext()) { + if (inputIterator.hasNext()) { final Tuple2<K, T> inputNext = inputIterator.next(); - while(edgesIterator.hasNext()) { + while (edgesIterator.hasNext()) { Edge<K, EV> edgesNext = edgesIterator.next(); - collector.collect(new Edge<K, EV>(edgesNext.f0, edgesNext.f1, mapper - .map(new Tuple2<EV, T>(edgesNext.f2, inputNext.f1)))); + collector.collect(new Edge<K, EV>(edgesNext.f0, + edgesNext.f1, mapper.map(new Tuple2<EV, T>( + edgesNext.f2, inputNext.f1)))); } } else { - while(edgesIterator.hasNext()) { + while (edgesIterator.hasNext()) { collector.collect(edgesIterator.next()); } } @@ -521,12 +530,14 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } /** - * Joins the edge DataSet with an input DataSet on the target key of the edges and the first attribute - * of the input DataSet and applies a UDF on the resulted values. - * Should the inputDataSet contain the same key more than once, only the first value will be considered. + * Joins the edge DataSet with an input DataSet on the target key of the + * edges and the first attribute of the input DataSet and applies a UDF on + * the resulted values. Should the inputDataSet contain the same key more + * than once, only the first value will be considered. + * * @param inputDataSet the DataSet to join with. * @param mapper the UDF map function to apply. - * @param <T> + * @param <T> the return type * @return a new graph where the edge values have been updated. */ public <T> Graph<K, VV, EV> joinWithEdgesOnTarget(DataSet<Tuple2<K, T>> inputDataSet, @@ -540,32 +551,32 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } /** - * Apply filtering functions to the graph - * and return a sub-graph that satisfies the predicates - * for both vertices and edges. + * Apply filtering functions to the graph and return a sub-graph that + * satisfies the predicates for both vertices and edges. + * * @param vertexFilter the filter function for vertices. * @param edgeFilter the filter function for edges. * @return the resulting sub-graph. */ public Graph<K, VV, EV> subgraph(FilterFunction<Vertex<K, VV>> vertexFilter, FilterFunction<Edge<K, EV>> edgeFilter) { - DataSet<Vertex<K, VV>> filteredVertices = this.vertices.filter(vertexFilter); + DataSet<Vertex<K, VV>> filteredVertices = this.vertices.filter(vertexFilter); - DataSet<Edge<K, EV>> remainingEdges = this.edges.join(filteredVertices) - .where(0).equalTo(0) - .with(new ProjectEdge<K, VV, EV>()) - .join(filteredVertices).where(1).equalTo(0) - .with(new ProjectEdge<K, VV, EV>()); + DataSet<Edge<K, EV>> remainingEdges = this.edges.join(filteredVertices) + .where(0).equalTo(0).with(new ProjectEdge<K, VV, EV>()) + .join(filteredVertices).where(1).equalTo(0) + .with(new ProjectEdge<K, VV, EV>()); - DataSet<Edge<K, EV>> filteredEdges = remainingEdges.filter(edgeFilter); + DataSet<Edge<K, EV>> filteredEdges = remainingEdges.filter(edgeFilter); - return new Graph<K, VV, EV>(filteredVertices, filteredEdges, this.context); - } + return new Graph<K, VV, EV>(filteredVertices, filteredEdges, + this.context); + } /** - * Apply a filtering function to the graph - * and return a sub-graph that satisfies the predicates - * only for the vertices. + * Apply a filtering function to the graph and return a sub-graph that + * satisfies the predicates only for the vertices. + * * @param vertexFilter the filter function for vertices. * @return the resulting sub-graph. */ @@ -574,8 +585,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab DataSet<Vertex<K, VV>> filteredVertices = this.vertices.filter(vertexFilter); DataSet<Edge<K, EV>> remainingEdges = this.edges.join(filteredVertices) - .where(0).equalTo(0) - .with(new ProjectEdge<K, VV, EV>()) + .where(0).equalTo(0).with(new ProjectEdge<K, VV, EV>()) .join(filteredVertices).where(1).equalTo(0) .with(new ProjectEdge<K, VV, EV>()); @@ -583,9 +593,9 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } /** - * Apply a filtering function to the graph - * and return a sub-graph that satisfies the predicates - * only for the edges. + * Apply a filtering function to the graph and return a sub-graph that + * satisfies the predicates only for the edges. + * * @param edgeFilter the filter function for edges. * @return the resulting sub-graph. */ @@ -595,32 +605,29 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab return new Graph<K, VV, EV>(this.vertices, filteredEdges, this.context); } - @ConstantFieldsFirst("0->0;1->1;2->2") - private static final class ProjectEdge<K extends Comparable<K> & Serializable, - VV extends Serializable, EV extends Serializable> implements FlatJoinFunction<Edge<K,EV>, Vertex<K,VV>, - Edge<K,EV>> { - public void join(Edge<K, EV> first, - Vertex<K, VV> second, Collector<Edge<K, EV>> out) { + @ConstantFieldsFirst("0->0;1->1;2->2") + private static final class ProjectEdge<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> + implements FlatJoinFunction<Edge<K, EV>, Vertex<K, VV>, Edge<K, EV>> { + public void join(Edge<K, EV> first, Vertex<K, VV> second, Collector<Edge<K, EV>> out) { out.collect(first); } - } - - /** - * Return the out-degree of all vertices in the graph - * @return A DataSet of Tuple2<vertexId, outDegree> - */ + } + + /** + * Return the out-degree of all vertices in the graph + * + * @return A DataSet of Tuple2<vertexId, outDegree> + */ public DataSet<Tuple2<K, Long>> outDegrees() { - return vertices.coGroup(edges).where(0).equalTo(0) - .with(new CountNeighborsCoGroup<K, VV, EV>()); + return vertices.coGroup(edges).where(0).equalTo(0).with(new CountNeighborsCoGroup<K, VV, EV>()); } - private static final class CountNeighborsCoGroup<K extends Comparable<K> & Serializable, - VV extends Serializable, EV extends Serializable> implements CoGroupFunction<Vertex<K, VV>, - Edge<K, EV>, Tuple2<K, Long>> { + private static final class CountNeighborsCoGroup<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> + implements CoGroupFunction<Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Long>> { @SuppressWarnings("unused") - public void coGroup(Iterable<Vertex<K, VV>> vertex, - Iterable<Edge<K, EV>> outEdges, Collector<Tuple2<K, Long>> out) { + public void coGroup(Iterable<Vertex<K, VV>> vertex, Iterable<Edge<K, EV>> outEdges, + Collector<Tuple2<K, Long>> out) { long count = 0; for (Edge<K, EV> edge : outEdges) { count++; @@ -628,19 +635,20 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab out.collect(new Tuple2<K, Long>(vertex.iterator().next().f0, count)); } } - + /** * Return the in-degree of all vertices in the graph + * * @return A DataSet of Tuple2<vertexId, inDegree> */ public DataSet<Tuple2<K, Long>> inDegrees() { - return vertices.coGroup(edges).where(0).equalTo(1) - .with(new CountNeighborsCoGroup<K, VV, EV>()); + return vertices.coGroup(edges).where(0).equalTo(1).with(new CountNeighborsCoGroup<K, VV, EV>()); } /** * Return the degree of all vertices in the graph + * * @return A DataSet of Tuple2<vertexId, degree> */ public DataSet<Tuple2<K, Long>> getDegrees() { @@ -648,22 +656,26 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } /** - * This operation adds all inverse-direction edges - * to the graph. + * This operation adds all inverse-direction edges to the graph. + * * @return the undirected graph. */ - public Graph<K, VV, EV> getUndirected() throws UnsupportedOperationException { - DataSet<Edge<K, EV>> undirectedEdges = - edges.union(edges.map(new ReverseEdgesMap<K, EV>())); - return new Graph<K, VV, EV>(vertices, undirectedEdges, this.context); + public Graph<K, VV, EV> getUndirected() { + + DataSet<Edge<K, EV>> undirectedEdges = edges.union(edges.map(new ReverseEdgesMap<K, EV>())); + return new Graph<K, VV, EV>(vertices, undirectedEdges, this.context); } - + /** - * Compute an aggregate over the edges of each vertex. - * The function applied on the edges has access to the vertex value. - * @param edgesFunction the function to apply to the neighborhood - * @param direction the edge direction (in-, out-, all-) - * @param <T> the output type + * Compute an aggregate over the edges of each vertex. The function applied + * on the edges has access to the vertex value. + * + * @param edgesFunction + * the function to apply to the neighborhood + * @param direction + * the edge direction (in-, out-, all-) + * @param <T> + * the output type * @return a dataset of a T * @throws IllegalArgumentException */ @@ -672,27 +684,29 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab switch (direction) { case IN: - return vertices.coGroup(edges).where(0).equalTo(1).with( - new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction)); + return vertices.coGroup(edges).where(0).equalTo(1) + .with(new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction)); case OUT: - return vertices.coGroup(edges).where(0).equalTo(0).with( - new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction)); + return vertices.coGroup(edges).where(0).equalTo(0) + .with(new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction)); case ALL: return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>())) - .where(0).equalTo(0) - .with(new ApplyCoGroupFunctionOnAllEdges<K, VV, EV, T>(edgesFunction)); + .where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<K, VV, EV, T>(edgesFunction)); default: throw new IllegalArgumentException("Illegal edge direction"); } } /** - * Compute an aggregate over the edges of each vertex. - * The function applied on the edges only has access to the vertex id - * (not the vertex value). - * @param edgesFunction the function to apply to the neighborhood - * @param direction the edge direction (in-, out-, all-) - * @param <T> the output type + * Compute an aggregate over the edges of each vertex. The function applied + * on the edges only has access to the vertex id (not the vertex value). + * + * @param edgesFunction + * the function to apply to the neighborhood + * @param direction + * the edge direction (in-, out-, all-) + * @param <T> + * the output type * @return a dataset of T * @throws IllegalArgumentException */ @@ -707,15 +721,15 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab return edges.map(new ProjectVertexIdMap<K, EV>(0)) .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction)); case ALL: - return edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>()).groupBy(0) - .reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction)); + return edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>()) + .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction)); default: throw new IllegalArgumentException("Illegal edge direction"); } } - private static final class ProjectVertexIdMap<K extends Comparable<K> & Serializable, - EV extends Serializable> implements MapFunction<Edge<K, EV>, Tuple2<K, Edge<K, EV>>> { + private static final class ProjectVertexIdMap<K extends Comparable<K> & Serializable, EV extends Serializable> + implements MapFunction<Edge<K, EV>, Tuple2<K, Edge<K, EV>>> { private int fieldPosition; @@ -725,13 +739,12 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab @SuppressWarnings("unchecked") public Tuple2<K, Edge<K, EV>> map(Edge<K, EV> edge) { - return new Tuple2<K, Edge<K, EV>>((K) edge.getField(fieldPosition), edge); + return new Tuple2<K, Edge<K, EV>>((K) edge.getField(fieldPosition), edge); } } - private static final class ApplyGroupReduceFunction<K extends Comparable<K> & Serializable, - EV extends Serializable, T> implements GroupReduceFunction<Tuple2<K, Edge<K, EV>>, T>, - ResultTypeQueryable<T> { + private static final class ApplyGroupReduceFunction<K extends Comparable<K> & Serializable, EV extends Serializable, T> + implements GroupReduceFunction<Tuple2<K, Edge<K, EV>>, T>, ResultTypeQueryable<T> { private EdgesFunction<K, EV, T> function; @@ -739,8 +752,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab this.function = fun; } - public void reduce(Iterable<Tuple2<K, Edge<K, EV>>> edges, - Collector<T> out) throws Exception { + public void reduce(Iterable<Tuple2<K, Edge<K, EV>>> edges, Collector<T> out) throws Exception { out.collect(function.iterateEdges(edges)); } @@ -750,34 +762,31 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } } - private static final class EmitOneEdgePerNode<K extends Comparable<K> & Serializable, - VV extends Serializable, EV extends Serializable> implements FlatMapFunction< - Edge<K, EV>, Tuple2<K, Edge<K, EV>>> { + private static final class EmitOneEdgePerNode<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> + implements FlatMapFunction<Edge<K, EV>, Tuple2<K, Edge<K, EV>>> { public void flatMap(Edge<K, EV> edge, Collector<Tuple2<K, Edge<K, EV>>> out) { out.collect(new Tuple2<K, Edge<K, EV>>(edge.getSource(), edge)); out.collect(new Tuple2<K, Edge<K, EV>>(edge.getTarget(), edge)); } } - private static final class EmitOneEdgeWithNeighborPerNode<K extends Comparable<K> & Serializable, - VV extends Serializable, EV extends Serializable> implements FlatMapFunction< - Edge<K, EV>, Tuple3<K, K, Edge<K, EV>>> { + private static final class EmitOneEdgeWithNeighborPerNode<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> + implements FlatMapFunction<Edge<K, EV>, Tuple3<K, K, Edge<K, EV>>> { public void flatMap(Edge<K, EV> edge, Collector<Tuple3<K, K, Edge<K, EV>>> out) { out.collect(new Tuple3<K, K, Edge<K, EV>>(edge.getSource(), edge.getTarget(), edge)); out.collect(new Tuple3<K, K, Edge<K, EV>>(edge.getTarget(), edge.getSource(), edge)); } } - private static final class ApplyCoGroupFunction<K extends Comparable<K> & Serializable, - VV extends Serializable, EV extends Serializable, T> - implements CoGroupFunction<Vertex<K, VV>, Edge<K, EV>, T>, - ResultTypeQueryable<T> { - + private static final class ApplyCoGroupFunction<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable, T> + implements CoGroupFunction<Vertex<K, VV>, Edge<K, EV>, T>, ResultTypeQueryable<T> { + private EdgesFunctionWithVertexValue<K, VV, EV, T> function; - - public ApplyCoGroupFunction (EdgesFunctionWithVertexValue<K, VV, EV, T> fun) { + + public ApplyCoGroupFunction(EdgesFunctionWithVertexValue<K, VV, EV, T> fun) { this.function = fun; } + public void coGroup(Iterable<Vertex<K, VV>> vertex, Iterable<Edge<K, EV>> edges, Collector<T> out) throws Exception { out.collect(function.iterateEdges(vertex.iterator().next(), edges)); @@ -785,63 +794,62 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab @Override public TypeInformation<T> getProducedType() { - return TypeExtractor.createTypeInfo(EdgesFunctionWithVertexValue.class, function.getClass(), 3, null, null); + return TypeExtractor.createTypeInfo(EdgesFunctionWithVertexValue.class, function.getClass(), 3, + null, null); } } - private static final class ApplyCoGroupFunctionOnAllEdges<K extends Comparable<K> & Serializable, - VV extends Serializable, EV extends Serializable, T> - implements CoGroupFunction<Vertex<K, VV>, Tuple2<K, Edge<K, EV>>, T>, - ResultTypeQueryable<T> { + private static final class ApplyCoGroupFunctionOnAllEdges<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable, T> + implements CoGroupFunction<Vertex<K, VV>, Tuple2<K, Edge<K, EV>>, T>, ResultTypeQueryable<T> { - private EdgesFunctionWithVertexValue<K, VV, EV, T> function; + private EdgesFunctionWithVertexValue<K, VV, EV, T> function; - public ApplyCoGroupFunctionOnAllEdges (EdgesFunctionWithVertexValue<K, VV, EV, T> fun) { - this.function = fun; - } + public ApplyCoGroupFunctionOnAllEdges(EdgesFunctionWithVertexValue<K, VV, EV, T> fun) { + this.function = fun; + } - public void coGroup(Iterable<Vertex<K, VV>> vertex, final Iterable<Tuple2<K, Edge<K, EV>>> keysWithEdges, - Collector<T> out) throws Exception { + public void coGroup(Iterable<Vertex<K, VV>> vertex, final Iterable<Tuple2<K, Edge<K, EV>>> keysWithEdges, + Collector<T> out) throws Exception { - final Iterator<Edge<K, EV>> edgesIterator = new Iterator<Edge<K,EV>>() { + final Iterator<Edge<K, EV>> edgesIterator = new Iterator<Edge<K, EV>>() { - final Iterator<Tuple2<K, Edge<K, EV>>> keysWithEdgesIterator = keysWithEdges.iterator(); + final Iterator<Tuple2<K, Edge<K, EV>>> keysWithEdgesIterator = keysWithEdges.iterator(); - @Override - public boolean hasNext() { - return keysWithEdgesIterator.hasNext(); - } + @Override + public boolean hasNext() { + return keysWithEdgesIterator.hasNext(); + } - @Override - public Edge<K, EV> next() { - return keysWithEdgesIterator.next().f1; - } + @Override + public Edge<K, EV> next() { + return keysWithEdgesIterator.next().f1; + } - @Override - public void remove() { - keysWithEdgesIterator.remove(); - } - }; + @Override + public void remove() { + keysWithEdgesIterator.remove(); + } + }; - Iterable<Edge<K, EV>> edgesIterable = new Iterable<Edge<K,EV>>() { - public Iterator<Edge<K, EV>> iterator() { - return edgesIterator; - } - }; + Iterable<Edge<K, EV>> edgesIterable = new Iterable<Edge<K, EV>>() { + public Iterator<Edge<K, EV>> iterator() { + return edgesIterator; + } + }; - out.collect(function.iterateEdges(vertex.iterator().next(), edgesIterable)); - } + out.collect(function.iterateEdges(vertex.iterator().next(), edgesIterable)); + } - @Override - public TypeInformation<T> getProducedType() { - return TypeExtractor.createTypeInfo(EdgesFunctionWithVertexValue.class, function.getClass(), 3, null, null); + @Override + public TypeInformation<T> getProducedType() { + return TypeExtractor.createTypeInfo(EdgesFunctionWithVertexValue.class, function.getClass(), 3, + null, null); + } } -} @ConstantFields("0->1;1->0;2->2") - private static final class ReverseEdgesMap<K extends Comparable<K> & Serializable, - EV extends Serializable> implements MapFunction<Edge<K, EV>, - Edge<K, EV>> { + private static final class ReverseEdgesMap<K extends Comparable<K> & Serializable, EV extends Serializable> + implements MapFunction<Edge<K, EV>, Edge<K, EV>> { public Edge<K, EV> map(Edge<K, EV> value) { return new Edge<K, EV>(value.f1, value.f0, value.f2); @@ -850,6 +858,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab /** * Reverse the direction of the edges in the graph + * * @return a new graph with all edges reversed * @throws UnsupportedOperationException */ @@ -861,273 +870,278 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab /** * @return Singleton DataSet containing the vertex count */ - public DataSet<Integer> numberOfVertices () { - return GraphUtils.count(vertices, context); - } + public DataSet<Integer> numberOfVertices() { + return GraphUtils.count(vertices, context); + } /** * @return Singleton DataSet containing the edge count */ - public DataSet<Integer> numberOfEdges () { - return GraphUtils.count(edges, context); - } - - /** - * @return The IDs of the vertices as DataSet - */ - public DataSet<K> getVertexIds () { - return vertices.map(new ExtractVertexIDMapper<K, VV>()); - } - - private static final class ExtractVertexIDMapper<K extends Comparable<K> & Serializable, - VV extends Serializable> implements MapFunction<Vertex<K, VV>, K> { - @Override - public K map(Vertex<K, VV> vertex) { - return vertex.f0; - } - } - - /** - * @return The IDs of the edges as DataSet - */ - public DataSet<Tuple2<K, K>> getEdgeIds () { - return edges.map(new ExtractEdgeIDsMapper<K, EV>()); - } - - private static final class ExtractEdgeIDsMapper<K extends Comparable<K> & Serializable, - EV extends Serializable> implements MapFunction<Edge<K, EV>, Tuple2<K, K>> { - @Override - public Tuple2<K, K> map(Edge<K, EV> edge) throws Exception { - return new Tuple2<K,K>(edge.f0, edge.f1); - } - } + public DataSet<Integer> numberOfEdges() { + return GraphUtils.count(edges, context); + } + + /** + * @return The IDs of the vertices as DataSet + */ + public DataSet<K> getVertexIds() { + return vertices.map(new ExtractVertexIDMapper<K, VV>()); + } + + private static final class ExtractVertexIDMapper<K extends Comparable<K> & Serializable, VV extends Serializable> + implements MapFunction<Vertex<K, VV>, K> { + @Override + public K map(Vertex<K, VV> vertex) { + return vertex.f0; + } + } + + /** + * @return The IDs of the edges as DataSet + */ + public DataSet<Tuple2<K, K>> getEdgeIds() { + return edges.map(new ExtractEdgeIDsMapper<K, EV>()); + } + + private static final class ExtractEdgeIDsMapper<K extends Comparable<K> & Serializable, EV extends Serializable> + implements MapFunction<Edge<K, EV>, Tuple2<K, K>> { + @Override + public Tuple2<K, K> map(Edge<K, EV> edge) throws Exception { + return new Tuple2<K, K>(edge.f0, edge.f1); + } + } /** * Checks the weak connectivity of a graph. - * @param maxIterations the maximum number of iterations for the inner delta iteration + * + * @param maxIterations + * the maximum number of iterations for the inner delta iteration * @return true if the graph is weakly connected. */ - public DataSet<Boolean> isWeaklyConnected (int maxIterations) { + public DataSet<Boolean> isWeaklyConnected(int maxIterations) { // first, convert to an undirected graph Graph<K, VV, EV> graph = this.getUndirected(); DataSet<K> vertexIds = graph.getVertexIds(); - DataSet<Tuple2<K,K>> verticesWithInitialIds = vertexIds - .map(new DuplicateVertexIDMapper<K>()); - - DataSet<Tuple2<K,K>> edgeIds = graph.getEdgeIds(); - - DeltaIteration<Tuple2<K,K>, Tuple2<K,K>> iteration = verticesWithInitialIds - .iterateDelta(verticesWithInitialIds, maxIterations, 0); - - DataSet<Tuple2<K, K>> changes = iteration.getWorkset() - .join(edgeIds, JoinHint.REPARTITION_SORT_MERGE) - .where(0).equalTo(0) - .with(new FindNeighborsJoin<K>()) - .groupBy(0) - .aggregate(Aggregations.MIN, 1) - .join(iteration.getSolutionSet(), JoinHint.REPARTITION_SORT_MERGE) - .where(0).equalTo(0) - .with(new VertexWithNewComponentJoin<K>()); - - DataSet<Tuple2<K, K>> components = iteration.closeWith(changes, changes); - DataSet<Boolean> result = GraphUtils.count(components.groupBy(1).reduceGroup( - new EmitFirstReducer<K>()), context).map(new CheckIfOneComponentMapper()); - return result; - } - + DataSet<Tuple2<K, K>> verticesWithInitialIds = vertexIds + .map(new DuplicateVertexIDMapper<K>()); + + DataSet<Tuple2<K, K>> edgeIds = graph.getEdgeIds(); + + DeltaIteration<Tuple2<K, K>, Tuple2<K, K>> iteration = verticesWithInitialIds + .iterateDelta(verticesWithInitialIds, maxIterations, 0); + + DataSet<Tuple2<K, K>> changes = iteration.getWorkset() + .join(edgeIds, JoinHint.REPARTITION_SORT_MERGE) + .where(0).equalTo(0).with(new FindNeighborsJoin<K>()) + .groupBy(0).aggregate(Aggregations.MIN, 1) + .join(iteration.getSolutionSet(), JoinHint.REPARTITION_SORT_MERGE).where(0).equalTo(0) + .with(new VertexWithNewComponentJoin<K>()); + + DataSet<Tuple2<K, K>> components = iteration.closeWith(changes, changes); + DataSet<Boolean> result = GraphUtils.count(components.groupBy(1).reduceGroup(new EmitFirstReducer<K>()), + context).map(new CheckIfOneComponentMapper()); + return result; + } + private static final class DuplicateVertexIDMapper<K> implements MapFunction<K, Tuple2<K, K>> { - @Override - public Tuple2<K, K> map(K k) { - return new Tuple2<K, K>(k, k); - } - } - - private static final class FindNeighborsJoin<K> implements JoinFunction<Tuple2<K, K>, Tuple2<K, K>, - Tuple2<K, K>> { - @Override - public Tuple2<K, K> join(Tuple2<K, K> vertexWithComponent, Tuple2<K, K> edge) { - return new Tuple2<K,K>(edge.f1, vertexWithComponent.f1); - } - } - - private static final class VertexWithNewComponentJoin<K extends Comparable<K>> - implements FlatJoinFunction<Tuple2<K, K>, Tuple2<K, K>, Tuple2<K, K>> { - @Override - public void join(Tuple2<K, K> candidate, Tuple2<K, K> old, Collector<Tuple2<K, K>> out) { - if (candidate.f1.compareTo(old.f1) < 0) { - out.collect(candidate); - } - } - } - - private static final class EmitFirstReducer<K> implements - GroupReduceFunction<Tuple2<K, K>, Tuple2<K, K>> { + @Override + public Tuple2<K, K> map(K k) { + return new Tuple2<K, K>(k, k); + } + } + + private static final class FindNeighborsJoin<K> implements JoinFunction<Tuple2<K, K>, Tuple2<K, K>, Tuple2<K, K>> { + @Override + public Tuple2<K, K> join(Tuple2<K, K> vertexWithComponent, Tuple2<K, K> edge) { + return new Tuple2<K, K>(edge.f1, vertexWithComponent.f1); + } + } + + private static final class VertexWithNewComponentJoin<K extends Comparable<K>> + implements FlatJoinFunction<Tuple2<K, K>, Tuple2<K, K>, Tuple2<K, K>> { + @Override + public void join(Tuple2<K, K> candidate, Tuple2<K, K> old, Collector<Tuple2<K, K>> out) { + if (candidate.f1.compareTo(old.f1) < 0) { + out.collect(candidate); + } + } + } + + private static final class EmitFirstReducer<K> implements GroupReduceFunction<Tuple2<K, K>, Tuple2<K, K>> { public void reduce(Iterable<Tuple2<K, K>> values, Collector<Tuple2<K, K>> out) { - out.collect(values.iterator().next()); + out.collect(values.iterator().next()); } } - - private static final class CheckIfOneComponentMapper implements MapFunction<Integer, Boolean> { - @Override - public Boolean map(Integer n) { - return (n == 1); - } + + private static final class CheckIfOneComponentMapper implements MapFunction<Integer, Boolean> { + @Override + public Boolean map(Integer n) { + return (n == 1); + } } - + /** - * Adds the input vertex and edges to the graph. - * If the vertex already exists in the graph, it will not be added again, - * but the given edges will. + * Adds the input vertex and edges to the graph. If the vertex already + * exists in the graph, it will not be added again, but the given edges + * will. + * * @param vertex the vertex to add to the graph * @param edges a list of edges to add to the grap - * @return the new graph containing the existing and newly added vertices and edges + * @return the new graph containing the existing and newly added vertices + * and edges + */ + @SuppressWarnings("unchecked") + public Graph<K, VV, EV> addVertex(final Vertex<K, VV> vertex, List<Edge<K, EV>> edges) { + DataSet<Vertex<K, VV>> newVertex = this.context.fromElements(vertex); + + // Take care of empty edge set + if (edges.isEmpty()) { + return new Graph<K, VV, EV>(this.vertices.union(newVertex) + .distinct(), this.edges, this.context); + } + + // Add the vertex and its edges + DataSet<Vertex<K, VV>> newVertices = this.vertices.union(newVertex).distinct(); + DataSet<Edge<K, EV>> newEdges = this.edges.union(context.fromCollection(edges)); + + return new Graph<K, VV, EV>(newVertices, newEdges, this.context); + } + + /** + * Adds the given edge to the graph. If the source and target vertices do + * not exist in the graph, they will also be added. + * + * @param source the source vertex of the edge + * @param target the target vertex of the edge + * @param edgeValue the edge value + * @return the new graph containing the existing vertices and edges plus the + * newly added edge */ @SuppressWarnings("unchecked") - public Graph<K, VV, EV> addVertex (final Vertex<K,VV> vertex, List<Edge<K, EV>> edges) { - DataSet<Vertex<K, VV>> newVertex = this.context.fromElements(vertex); - - // Take care of empty edge set - if (edges.isEmpty()) { - return new Graph<K, VV, EV>(this.vertices.union(newVertex).distinct(), this.edges, this.context); - } - - // Add the vertex and its edges - DataSet<Vertex<K, VV>> newVertices = this.vertices.union(newVertex).distinct(); - DataSet<Edge<K, EV>> newEdges = this.edges.union(context.fromCollection(edges)); - - return new Graph<K, VV, EV>(newVertices, newEdges, this.context); - } - - /** - * Adds the given edge to the graph. - * If the source and target vertices do not exist in the graph, - * they will also be added. - * @param source the source vertex of the edge - * @param target the target vertex of the edge - * @param edgeValue the edge value - * @return the new graph containing the existing vertices and edges plus the newly added edge - */ - public Graph<K, VV, EV> addEdge (Vertex<K,VV> source, Vertex<K,VV> target, EV edgeValue) { - Graph<K,VV,EV> partialGraph = fromCollection( - Arrays.asList(source, target), + public Graph<K, VV, EV> addEdge(Vertex<K, VV> source, Vertex<K, VV> target, EV edgeValue) { + Graph<K, VV, EV> partialGraph = fromCollection(Arrays.asList(source, target), Arrays.asList(new Edge<K, EV>(source.f0, target.f0, edgeValue)), - this.context - ); - return this.union(partialGraph); - } + this.context); + return this.union(partialGraph); + } /** * Removes the given vertex and its edges from the graph. + * * @param vertex the vertex to remove - * @return the new graph containing the existing vertices and edges without the removed vertex and its edges + * @return the new graph containing the existing vertices and edges without + * the removed vertex and its edges */ - public Graph<K, VV, EV> removeVertex (Vertex<K,VV> vertex) { - - DataSet<Vertex<K, VV>> newVertices = getVertices().filter( - new RemoveVertexFilter<K, VV>(vertex)); - DataSet<Edge<K, EV>> newEdges = getEdges().filter( - new VertexRemovalEdgeFilter<K, VV, EV>(vertex)); - return new Graph<K, VV, EV>(newVertices, newEdges, this.context); - } - - private static final class RemoveVertexFilter<K extends Comparable<K> & Serializable, - VV extends Serializable> implements FilterFunction<Vertex<K, VV>> { - - private Vertex<K, VV> vertexToRemove; - - public RemoveVertexFilter(Vertex<K, VV> vertex) { - vertexToRemove = vertex; + public Graph<K, VV, EV> removeVertex(Vertex<K, VV> vertex) { + + DataSet<Vertex<K, VV>> newVertices = getVertices().filter(new RemoveVertexFilter<K, VV>(vertex)); + DataSet<Edge<K, EV>> newEdges = getEdges().filter(new VertexRemovalEdgeFilter<K, VV, EV>(vertex)); + return new Graph<K, VV, EV>(newVertices, newEdges, this.context); + } + + private static final class RemoveVertexFilter<K extends Comparable<K> & Serializable, VV extends Serializable> + implements FilterFunction<Vertex<K, VV>> { + + private Vertex<K, VV> vertexToRemove; + + public RemoveVertexFilter(Vertex<K, VV> vertex) { + vertexToRemove = vertex; } - @Override - public boolean filter(Vertex<K, VV> vertex) throws Exception { - return !vertex.f0.equals(vertexToRemove.f0); - } - } - - private static final class VertexRemovalEdgeFilter<K extends Comparable<K> & Serializable, - VV extends Serializable, EV extends Serializable> implements FilterFunction<Edge<K, EV>> { + @Override + public boolean filter(Vertex<K, VV> vertex) throws Exception { + return !vertex.f0.equals(vertexToRemove.f0); + } + } + + private static final class VertexRemovalEdgeFilter<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> + implements FilterFunction<Edge<K, EV>> { - private Vertex<K, VV> vertexToRemove; + private Vertex<K, VV> vertexToRemove; - public VertexRemovalEdgeFilter(Vertex<K, VV> vertex) { + public VertexRemovalEdgeFilter(Vertex<K, VV> vertex) { vertexToRemove = vertex; } - @Override - public boolean filter(Edge<K, EV> edge) throws Exception { - - if (edge.f0.equals(vertexToRemove.f0)) { - return false; - } - if (edge.f1.equals(vertexToRemove.f0)) { - return false; - } - return true; - } - } - - /** - * Removes all edges that match the given edge from the graph. - * @param edge the edge to remove - * @return the new graph containing the existing vertices and edges without the removed edges - */ - public Graph<K, VV, EV> removeEdge (Edge<K, EV> edge) { - DataSet<Edge<K, EV>> newEdges = getEdges().filter( - new EdgeRemovalEdgeFilter<K, EV>(edge)); - return new Graph<K, VV, EV>(this.vertices, newEdges, this.context); - } - - private static final class EdgeRemovalEdgeFilter<K extends Comparable<K> & Serializable, - EV extends Serializable> implements FilterFunction<Edge<K, EV>> { - private Edge<K, EV> edgeToRemove; - - public EdgeRemovalEdgeFilter(Edge<K, EV> edge) { + @Override + public boolean filter(Edge<K, EV> edge) throws Exception { + + if (edge.f0.equals(vertexToRemove.f0)) { + return false; + } + if (edge.f1.equals(vertexToRemove.f0)) { + return false; + } + return true; + } + } + + /** + * Removes all edges that match the given edge from the graph. + * + * @param edge the edge to remove + * @return the new graph containing the existing vertices and edges without + * the removed edges + */ + public Graph<K, VV, EV> removeEdge(Edge<K, EV> edge) { + DataSet<Edge<K, EV>> newEdges = getEdges().filter(new EdgeRemovalEdgeFilter<K, EV>(edge)); + return new Graph<K, VV, EV>(this.vertices, newEdges, this.context); + } + + private static final class EdgeRemovalEdgeFilter<K extends Comparable<K> & Serializable, EV extends Serializable> + implements FilterFunction<Edge<K, EV>> { + private Edge<K, EV> edgeToRemove; + + public EdgeRemovalEdgeFilter(Edge<K, EV> edge) { edgeToRemove = edge; } - @Override - public boolean filter(Edge<K, EV> edge) { - return (!(edge.f0.equals(edgeToRemove.f0) - && edge.f1.equals(edgeToRemove.f1))); - } - } - - /** - * Performs union on the vertices and edges sets of the input graphs - * removing duplicate vertices but maintaining duplicate edges. - * @param graph the graph to perform union with - * @return a new graph - */ - public Graph<K, VV, EV> union (Graph<K, VV, EV> graph) { - DataSet<Vertex<K,VV>> unionedVertices = graph.getVertices().union(this.getVertices()).distinct(); - DataSet<Edge<K,EV>> unionedEdges = graph.getEdges().union(this.getEdges()); - return new Graph<K,VV,EV>(unionedVertices, unionedEdges, this.context); - } + @Override + public boolean filter(Edge<K, EV> edge) { + return (!(edge.f0.equals(edgeToRemove.f0) && edge.f1 + .equals(edgeToRemove.f1))); + } + } + + /** + * Performs union on the vertices and edges sets of the input graphs + * removing duplicate vertices but maintaining duplicate edges. + * + * @param graph the graph to perform union with + * @return a new graph + */ + public Graph<K, VV, EV> union(Graph<K, VV, EV> graph) { + + DataSet<Vertex<K, VV>> unionedVertices = graph.getVertices().union(this.getVertices()).distinct(); + DataSet<Edge<K, EV>> unionedEdges = graph.getEdges().union(this.getEdges()); + return new Graph<K, VV, EV>(unionedVertices, unionedEdges, this.context); + } /** * Runs a Vertex-Centric iteration on the graph. + * * @param vertexUpdateFunction the vertex update function * @param messagingFunction the messaging function * @param maximumNumberOfIterations maximum number of iterations to perform * @return */ - public <M>Graph<K, VV, EV> runVertexCentricIteration(VertexUpdateFunction<K, VV, M> vertexUpdateFunction, - MessagingFunction<K, VV, M, EV> messagingFunction, int maximumNumberOfIterations) { - DataSet<Vertex<K, VV>> newVertices = vertices.runOperation( - VertexCentricIteration.withEdges(edges, - vertexUpdateFunction, messagingFunction, maximumNumberOfIterations)); + public <M> Graph<K, VV, EV> runVertexCentricIteration(VertexUpdateFunction<K, VV, M> vertexUpdateFunction, + MessagingFunction<K, VV, M, EV> messagingFunction, int maximumNumberOfIterations) { + DataSet<Vertex<K, VV>> newVertices = vertices.runOperation(VertexCentricIteration + .withEdges(edges, vertexUpdateFunction, messagingFunction, maximumNumberOfIterations)); return new Graph<K, VV, EV>(newVertices, this.edges, this.context); - } + } - public Graph<K, VV, EV> run (GraphAlgorithm<K, VV, EV> algorithm) { + public Graph<K, VV, EV> run(GraphAlgorithm<K, VV, EV> algorithm) { return algorithm.run(this); } /** - * Compute an aggregate over the neighbors (edges and vertices) of each vertex. - * The function applied on the neighbors has access to the vertex value. + * Compute an aggregate over the neighbors (edges and vertices) of each + * vertex. The function applied on the neighbors has access to the vertex + * value. + * * @param neighborsFunction the function to apply to the neighborhood * @param direction the edge direction (in-, out-, all-) * @param <T> the output type @@ -1139,23 +1153,27 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab switch (direction) { case IN: // create <edge-sourceVertex> pairs - DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> edgesWithSources = edges.join(this.vertices) - .where(0).equalTo(0); - return vertices.coGroup(edgesWithSources).where(0).equalTo("f0.f1").with( - new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction)); + DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> edgesWithSources = edges + .join(this.vertices).where(0).equalTo(0); + return vertices.coGroup(edgesWithSources) + .where(0).equalTo("f0.f1") + .with(new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction)); case OUT: // create <edge-targetVertex> pairs - DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges.join(this.vertices) - .where(1).equalTo(0); - return vertices.coGroup(edgesWithTargets).where(0).equalTo("f0.f0").with( - new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction)); + DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges + .join(this.vertices).where(1).equalTo(0); + return vertices.coGroup(edgesWithTargets) + .where(0).equalTo("f0.f0") + .with(new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction)); case ALL: // create <edge-sourceOrTargetVertex> pairs - DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges.flatMap( - new EmitOneEdgeWithNeighborPerNode<K, VV, EV>()).join(this.vertices) - .where(1).equalTo(0).with(new ProjectEdgeWithNeighbor<K, VV, EV>()); + DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges + .flatMap(new EmitOneEdgeWithNeighborPerNode<K, VV, EV>()) + .join(this.vertices).where(1).equalTo(0) + .with(new ProjectEdgeWithNeighbor<K, VV, EV>()); - return vertices.coGroup(edgesWithNeighbors).where(0).equalTo(0) + return vertices.coGroup(edgesWithNeighbors) + .where(0).equalTo(0) .with(new ApplyCoGroupFunctionOnAllNeighbors<K, VV, EV, T>(neighborsFunction)); default: throw new IllegalArgumentException("Illegal edge direction"); @@ -1163,9 +1181,10 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } /** - * Compute an aggregate over the neighbors (edges and vertices) of each vertex. - * The function applied on the neighbors only has access to the vertex id - * (not the vertex value). + * Compute an aggregate over the neighbors (edges and vertices) of each + * vertex. The function applied on the neighbors only has access to the + * vertex id (not the vertex value). + * * @param neighborsFunction the function to apply to the neighborhood * @param direction the edge direction (in-, out-, all-) * @param <T> the output type @@ -1177,21 +1196,24 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab switch (direction) { case IN: // create <edge-sourceVertex> pairs - DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithSources = edges.join(this.vertices) - .where(0).equalTo(0).with(new ProjectVertexIdJoin<K, VV, EV>(1)); + DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithSources = edges + .join(this.vertices).where(0).equalTo(0) + .with(new ProjectVertexIdJoin<K, VV, EV>(1)); return edgesWithSources.groupBy(0).reduceGroup( new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)); case OUT: // create <edge-targetVertex> pairs - DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges.join(this.vertices) - .where(1).equalTo(0).with(new ProjectVertexIdJoin<K, VV, EV>(0)); - return edgesWithTargets.groupBy(0).reduceGroup( - new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)); + DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges + .join(this.vertices).where(1).equalTo(0) + .with(new ProjectVertexIdJoin<K, VV, EV>(0)); + return edgesWithTargets.groupBy(0).reduceGroup( + new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)); case ALL: // create <edge-sourceOrTargetVertex> pairs - DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges.flatMap( - new EmitOneEdgeWithNeighborPerNode<K, VV, EV>()).join(this.vertices) - .where(1).equalTo(0).with(new ProjectEdgeWithNeighbor<K, VV, EV>()); + DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges + .flatMap(new EmitOneEdgeWithNeighborPerNode<K, VV, EV>()) + .join(this.vertices).where(1).equalTo(0) + .with(new ProjectEdgeWithNeighbor<K, VV, EV>()); return edgesWithNeighbors.groupBy(0).reduceGroup( new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)); @@ -1200,125 +1222,118 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } } - private static final class ApplyNeighborGroupReduceFunction<K extends Comparable<K> & Serializable, - VV extends Serializable, EV extends Serializable, T> implements GroupReduceFunction< - Tuple3<K, Edge<K, EV>, Vertex<K, VV>>, T>, ResultTypeQueryable<T> { - + private static final class ApplyNeighborGroupReduceFunction<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable, T> + implements GroupReduceFunction<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>, T>, ResultTypeQueryable<T> { + private NeighborsFunction<K, VV, EV, T> function; - + public ApplyNeighborGroupReduceFunction(NeighborsFunction<K, VV, EV, T> fun) { this.function = fun; } - - public void reduce(Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edges, - Collector<T> out) throws Exception { + + public void reduce(Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edges, Collector<T> out) throws Exception { out.collect(function.iterateNeighbors(edges)); - + } @Override public TypeInformation<T> getProducedType() { return TypeExtractor.createTypeInfo(NeighborsFunction.class, function.getClass(), 3, null, null); - } + } } - private static final class ProjectVertexIdJoin<K extends Comparable<K> & Serializable, - VV extends Serializable, EV extends Serializable> implements FlatJoinFunction<Edge<K, EV>, - Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> { + private static final class ProjectVertexIdJoin<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> + implements FlatJoinFunction<Edge<K, EV>, Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> { private int fieldPosition; public ProjectVertexIdJoin(int position) { this.fieldPosition = position; } + @SuppressWarnings("unchecked") - public void join(Edge<K, EV> edge, Vertex<K, VV> otherVertex, + public void join(Edge<K, EV> edge, Vertex<K, VV> otherVertex, Collector<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> out) { - out.collect(new Tuple3<K, Edge<K, EV>, Vertex<K, VV>>( - (K)edge.getField(fieldPosition), edge, otherVertex)); + out.collect(new Tuple3<K, Edge<K, EV>, Vertex<K, VV>>((K) edge.getField(fieldPosition), edge, otherVertex)); } } - private static final class ProjectEdgeWithNeighbor<K extends Comparable<K> & Serializable, - VV extends Serializable, EV extends Serializable> implements - FlatJoinFunction<Tuple3<K, K, Edge<K, EV>>, Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> { + private static final class ProjectEdgeWithNeighbor<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> + implements FlatJoinFunction<Tuple3<K, K, Edge<K, EV>>, Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> { public void join(Tuple3<K, K, Edge<K, EV>> keysWithEdge, Vertex<K, VV> neighbor, Collector<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> out) { - out.collect(new Tuple3<K, Edge<K, EV>, Vertex<K, VV>>(keysWithEdge.f0, - keysWithEdge.f2, neighbor)); + + out.collect(new Tuple3<K, Edge<K, EV>, Vertex<K, VV>>(keysWithEdge.f0, keysWithEdge.f2, neighbor)); } } - private static final class ApplyNeighborCoGroupFunction<K extends Comparable<K> & Serializable, - VV extends Serializable, EV extends Serializable, T> - implements CoGroupFunction<Vertex<K, VV>, Tuple2<Edge<K, EV>, Vertex<K, VV>>, T>, - ResultTypeQueryable<T> { - + private static final class ApplyNeighborCoGroupFunction<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable, T> + implements CoGroupFunction<Vertex<K, VV>, Tuple2<Edge<K, EV>, Vertex<K, VV>>, T>, ResultTypeQueryable<T> { + private NeighborsFunctionWithVertexValue<K, VV, EV, T> function; - - public ApplyNeighborCoGroupFunction (NeighborsFunctionWithVertexValue<K, VV, EV, T> fun) { + + public ApplyNeighborCoGroupFunction(NeighborsFunctionWithVertexValue<K, VV, EV, T> fun) { this.function = fun; } - public void coGroup(Iterable<Vertex<K, VV>> vertex, - Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighbors, Collector<T> out) throws Exception { - out.collect(function.iterateNeighbors(vertex.iterator().next(), neighbors)); + + public void coGroup(Iterable<Vertex<K, VV>> vertex, Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighbors, + Collector<T> out) throws Exception { + out.collect(function.iterateNeighbors(vertex.iterator().next(), neighbors)); } + @Override public TypeInformation<T> getProducedType() { - return TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class, - function.getClass(), 3, null, null); + return TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class, function.getClass(), 3, null, null); } } - private static final class ApplyCoGroupFunctionOnAllNeighbors<K extends Comparable<K> & Serializable, - VV extends Serializable, EV extends Serializable, T> - implements CoGroupFunction<Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>, T>, - ResultTypeQueryable<T> { + private static final class ApplyCoGroupFunctionOnAllNeighbors<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable, T> + implements CoGroupFunction<Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>, T>, ResultTypeQueryable<T> { private NeighborsFunctionWithVertexValue<K, VV, EV, T> function; - - public ApplyCoGroupFunctionOnAllNeighbors (NeighborsFunctionWithVertexValue<K, VV, EV, T> fun) { + + public ApplyCoGroupFunctionOnAllNeighbors(NeighborsFunctionWithVertexValue<K, VV, EV, T> fun) { this.function = fun; } - public void coGroup(Iterable<Vertex<K, VV>> vertex, final Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> keysWithNeighbors, + public void coGroup(Iterable<Vertex<K, VV>> vertex, + final Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> keysWithNeighbors, Collector<T> out) throws Exception { - + final Iterator<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighborsIterator = new Iterator<Tuple2<Edge<K, EV>, Vertex<K, VV>>>() { - - final Iterator<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> keysWithEdgesIterator = - keysWithNeighbors.iterator(); - + + final Iterator<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> keysWithEdgesIterator = keysWithNeighbors.iterator(); + @Override public boolean hasNext() { return keysWithEdgesIterator.hasNext(); } - + @Override public Tuple2<Edge<K, EV>, Vertex<K, VV>> next() { - Tuple3<K, Edge<K, EV>, Vertex<K, VV>> next = keysWithEdgesIterator.next(); + Tuple3<K, Edge<K, EV>, Vertex<K, VV>> next = keysWithEdgesIterator.next(); return new Tuple2<Edge<K, EV>, Vertex<K, VV>>(next.f1, next.f2); } @Override public void remove() { keysWithEdgesIterator.remove(); - } + } }; - + Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighborsIterable = new Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>>() { public Iterator<Tuple2<Edge<K, EV>, Vertex<K, VV>>> iterator() { return neighborsIterator; } }; - - out.collect(function.iterateNeighbors(vertex.iterator().next(), neighborsIterable)); - } + + out.collect(function.iterateNeighbors(vertex.iterator().next(), + neighborsIterable)); + } @Override public TypeInformation<T> getProducedType() { - return TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class, - function.getClass(), 3, null, null); + return TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class, function.getClass(), 3, null, null); } } -} +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java index 2f5de95..f5e7018 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java @@ -1,5 +1,22 @@ -package flink.graphs; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.graph; import java.io.Serializable; @@ -8,9 +25,7 @@ import java.io.Serializable; * @param <VV> vertex value type * @param <EV> edge value type */ -public interface GraphAlgorithm<K extends Comparable<K> & Serializable, VV extends Serializable, - EV extends Serializable> { - - public Graph<K,VV,EV> run (Graph<K,VV,EV> input); +public interface GraphAlgorithm<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> { + public Graph<K, VV, EV> run(Graph<K, VV, EV> input); } http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java index 124aea0..e0dfe11 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java @@ -1,4 +1,22 @@ -package flink.graphs; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; import java.io.Serializable; http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java index d7b438c..c774024 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java @@ -1,4 +1,22 @@ -package flink.graphs; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; import java.io.Serializable; http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java index e589096..0b19e0e 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package flink.graphs; +package org.apache.flink.graph; import java.io.Serializable; http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java index 48a5fe4..33f8f1a 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java @@ -1,4 +1,22 @@ -package flink.graphs.example; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; import java.util.Collection; @@ -10,11 +28,10 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.aggregation.Aggregations; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.example.utils.ExampleUtils; import org.apache.flink.types.NullValue; -import flink.graphs.Graph; -import flink.graphs.example.utils.ExampleUtils; - /** * * A simple example to illustrate the basic functionality of the graph-api. @@ -42,7 +59,7 @@ public class GraphMetrics implements ProgramDescription { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); /** create a random graph **/ - Graph<Long, NullValue, NullValue> graph = Graph.create(ExampleUtils + Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(ExampleUtils .getRandomEdges(env, NUM_VERTICES), env); /** get the number of vertices **/ @@ -71,13 +88,13 @@ public class GraphMetrics implements ProgramDescription { DataSet<Long> minOutDegreeVertex = graph.outDegrees().minBy(1).map(new ProjectVertexId()); /** print the results **/ - ExampleUtils.printResult(numVertices, "Total number of vertices", env); - ExampleUtils.printResult(numEdges, "Total number of edges", env); - ExampleUtils.printResult(avgNodeDegree, "Average node degree", env); - ExampleUtils.printResult(maxInDegreeVertex, "Vertex with Max in-degree", env); - ExampleUtils.printResult(minInDegreeVertex, "Vertex with Min in-degree", env); - ExampleUtils.printResult(maxOutDegreeVertex, "Vertex with Max out-degree", env); - ExampleUtils.printResult(minOutDegreeVertex, "Vertex with Min out-degree", env); + ExampleUtils.printResult(numVertices, "Total number of vertices"); + ExampleUtils.printResult(numEdges, "Total number of edges"); + ExampleUtils.printResult(avgNodeDegree, "Average node degree"); + ExampleUtils.printResult(maxInDegreeVertex, "Vertex with Max in-degree"); + ExampleUtils.printResult(minInDegreeVertex, "Vertex with Min in-degree"); + ExampleUtils.printResult(maxOutDegreeVertex, "Vertex with Max out-degree"); + ExampleUtils.printResult(minOutDegreeVertex, "Vertex with Min out-degree"); env.execute(); }