[FLINK-1201] [gelly] use type hints for mapVertices, mapEdges and create
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1d6a2001 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1d6a2001 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1d6a2001 Branch: refs/heads/master Commit: 1d6a2001abafa69d9acba3a6689b375402f851e1 Parents: 40457c2 Author: vasia <vasilikikala...@gmail.com> Authored: Thu Jan 15 18:14:39 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Feb 11 10:46:14 2015 +0100 ---------------------------------------------------------------------- .../main/java/org/apache/flink/graph/Graph.java | 144 +++++++------------ 1 file changed, 53 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1d6a2001/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 1990f26..fb72a20 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -57,8 +57,7 @@ import flink.graphs.utils.Tuple2ToVertexMap; import flink.graphs.validation.GraphValidator; @SuppressWarnings("serial") -public class Graph<K extends Comparable<K> & Serializable, VV extends Serializable, - EV extends Serializable> implements Serializable { +public class Graph<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> { private final ExecutionEnvironment context; private final DataSet<Vertex<K, VV>> vertices; @@ -104,76 +103,52 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab * @param mapper * @return a new graph */ - public <NV extends Serializable> Graph<K, NV, EV> mapVertices(final MapFunction<Vertex<K, VV>, NV> mapper) { + @SuppressWarnings({ "unchecked", "rawtypes" }) + public <NV extends Serializable> Graph<K, NV, EV> mapVertices(final MapFunction<Vertex<K, VV>, NV> mapper) { + TypeInformation<K> keyType = ((TupleTypeInfo<?>) vertices.getType()).getTypeAt(0); - DataSet<Vertex<K, NV>> mappedVertices = vertices.map(new ApplyMapperToVertexWithType<K, VV, NV>(mapper, - keyType)); + + TypeInformation<NV> valueType = TypeExtractor + .createTypeInfo(MapFunction.class, mapper.getClass(), 1, null, null); + + TypeInformation<Vertex<K, NV>> returnType = (TypeInformation<Vertex<K, NV>>) + new TupleTypeInfo(Vertex.class, keyType, valueType); + + DataSet<Vertex<K, NV>> mappedVertices = vertices + .map(new MapFunction<Vertex<K,VV>, Vertex<K, NV>>() { + public Vertex<K, NV> map(Vertex<K, VV> value) throws Exception { + return new Vertex<K, NV>(value.f0, mapper.map(value)); + } + }) + .returns(returnType); + return new Graph<K, NV, EV>(mappedVertices, this.getEdges(), this.context); } - private static final class ApplyMapperToVertexWithType<K extends Comparable<K> & Serializable, - VV extends Serializable, NV extends Serializable> implements MapFunction - <Vertex<K, VV>, Vertex<K, NV>>, ResultTypeQueryable<Vertex<K, NV>> { - - private MapFunction<Vertex<K, VV>, NV> innerMapper; - private transient TypeInformation<K> keyType; - public ApplyMapperToVertexWithType(MapFunction<Vertex<K, VV>, NV> theMapper, TypeInformation<K> keyType) { - this.innerMapper = theMapper; - this.keyType = keyType; - } - - public Vertex<K, NV> map(Vertex<K, VV> value) throws Exception { - return new Vertex<K, NV>(value.f0, innerMapper.map(value)); - } - - @SuppressWarnings("unchecked") - @Override - public TypeInformation<Vertex<K, NV>> getProducedType() { - TypeInformation<NV> valueType = TypeExtractor - .createTypeInfo(MapFunction.class, innerMapper.getClass(), 1, null, null); - @SuppressWarnings("rawtypes") - TypeInformation<?> returnType = new TupleTypeInfo<Vertex>(Vertex.class, keyType, valueType); - return (TypeInformation<Vertex<K, NV>>) returnType; - } - } - /** * Apply a function to the attribute of each edge in the graph. * @param mapper * @return */ - public <NV extends Serializable> Graph<K, VV, NV> mapEdges(final MapFunction<Edge<K, EV>, NV> mapper) { + @SuppressWarnings({ "unchecked", "rawtypes" }) + public <NV extends Serializable> Graph<K, VV, NV> mapEdges(final MapFunction<Edge<K, EV>, NV> mapper) { + TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0); - DataSet<Edge<K, NV>> mappedEdges = edges.map(new ApplyMapperToEdgeWithType<K, EV, NV>(mapper, - keyType)); - return new Graph<K, VV, NV>(this.vertices, mappedEdges, this.context); - } - - private static final class ApplyMapperToEdgeWithType<K extends Comparable<K> & Serializable, - EV extends Serializable, NV extends Serializable> implements MapFunction - <Edge<K, EV>, Edge<K, NV>>, ResultTypeQueryable<Edge<K, NV>> { - - private MapFunction<Edge<K, EV>, NV> innerMapper; - private transient TypeInformation<K> keyType; - - public ApplyMapperToEdgeWithType(MapFunction<Edge<K, EV>, NV> theMapper, TypeInformation<K> keyType) { - this.innerMapper = theMapper; - this.keyType = keyType; - } - - public Edge<K, NV> map(Edge<K, EV> value) throws Exception { - return new Edge<K, NV>(value.f0, value.f1, innerMapper.map(value)); - } - - @SuppressWarnings("unchecked") - @Override - public TypeInformation<Edge<K, NV>> getProducedType() { - TypeInformation<NV> valueType = TypeExtractor - .createTypeInfo(MapFunction.class, innerMapper.getClass(), 1, null, null); - @SuppressWarnings("rawtypes") - TypeInformation<?> returnType = new TupleTypeInfo<Edge>(Edge.class, keyType, keyType, valueType); - return (TypeInformation<Edge<K, NV>>) returnType; + + TypeInformation<NV> valueType = TypeExtractor + .createTypeInfo(MapFunction.class, mapper.getClass(), 1, null, null); + + TypeInformation<Edge<K, NV>> returnType = (TypeInformation<Edge<K, NV>>) + new TupleTypeInfo(Edge.class, keyType, keyType, valueType); + + DataSet<Edge<K, NV>> mappedEdges = edges.map(new MapFunction<Edge<K, EV>, Edge<K, NV>>() { + public Edge<K, NV> map(Edge<K, EV> value) throws Exception { + return new Edge<K, NV>(value.f0, value.f1, mapper.map(value)); } + }) + .returns(returnType); + + return new Graph<K, VV, NV>(this.vertices, mappedEdges, this.context); } /** @@ -473,6 +448,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab */ public <T> DataSet<T> reduceOnEdges(EdgesFunctionWithVertexValue<K, VV, EV, T> edgesFunction, EdgeDirection direction) throws IllegalArgumentException { + switch (direction) { case IN: return vertices.coGroup(edges).where(0).equalTo(1).with( @@ -501,6 +477,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab */ public <T> DataSet<T> reduceOnEdges(EdgesFunction<K, EV, T> edgesFunction, EdgeDirection direction) throws IllegalArgumentException { + switch (direction) { case IN: return edges.map(new ProjectVertexIdMap<K, EV>(1)) @@ -544,7 +521,6 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab public void reduce(Iterable<Tuple2<K, Edge<K, EV>>> edges, Collector<T> out) throws Exception { out.collect(function.iterateEdges(edges)); - } @Override @@ -585,6 +561,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab Iterable<Edge<K, EV>> edges, Collector<T> out) throws Exception { out.collect(function.iterateEdges(vertex.iterator().next(), edges)); } + @Override public TypeInformation<T> getProducedType() { return TypeExtractor.createTypeInfo(EdgesFunctionWithVertexValue.class, function.getClass(), 3, null, null); @@ -624,7 +601,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab keysWithEdgesIterator.remove(); } }; - + Iterable<Edge<K, EV>> edgesIterable = new Iterable<Edge<K,EV>>() { public Iterator<Edge<K, EV>> iterator() { return edgesIterator; @@ -703,39 +680,24 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab Graph<K, VV, EV> create(DataSet<Edge<K, EV>> edges, final MapFunction<K, VV> mapper, ExecutionEnvironment context) { TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0); + + TypeInformation<VV> valueType = TypeExtractor + .createTypeInfo(MapFunction.class, mapper.getClass(), 1, null, null); + + @SuppressWarnings({ "unchecked", "rawtypes" }) + TypeInformation<Vertex<K, VV>> returnType = (TypeInformation<Vertex<K, VV>>) + new TupleTypeInfo(Vertex.class, keyType, valueType); + DataSet<Vertex<K, VV>> vertices = edges.flatMap(new EmitSrcAndTargetAsTuple1<K, EV>()) - .distinct().map(new ApplyMapperToVertexValuesWithType<K, VV>(mapper, keyType)); + .distinct().map(new MapFunction<Tuple1<K>, Vertex<K, VV>>(){ + public Vertex<K, VV> map(Tuple1<K> value) throws Exception { + return new Vertex<K, VV>(value.f0, mapper.map(value.f0)); + } + }).returns(returnType); return new Graph<K, VV, EV>(vertices, edges, context); } - - private static final class ApplyMapperToVertexValuesWithType<K extends Comparable<K> & Serializable, - VV extends Serializable> implements MapFunction - <Tuple1<K>, Vertex<K, VV>>, ResultTypeQueryable<Vertex<K, VV>> { - - private MapFunction<K, VV> innerMapper; - private transient TypeInformation<K> keyType; - public ApplyMapperToVertexValuesWithType(MapFunction<K, VV> theMapper, TypeInformation<K> keyType) { - this.innerMapper = theMapper; - this.keyType = keyType; - } - - public Vertex<K, VV> map(Tuple1<K> value) throws Exception { - return new Vertex<K, VV>(value.f0, innerMapper.map(value.f0)); - } - - @SuppressWarnings("unchecked") - @Override - public TypeInformation<Vertex<K, VV>> getProducedType() { - TypeInformation<VV> valueType = TypeExtractor - .createTypeInfo(MapFunction.class, innerMapper.getClass(), 1, null, null); - @SuppressWarnings("rawtypes") - TypeInformation<?> returnType = new TupleTypeInfo<Vertex>(Vertex.class, keyType, valueType); - return (TypeInformation<Vertex<K, VV>>) returnType; - } - } - private static final class EmitSrcAndTarget<K extends Comparable<K> & Serializable, EV extends Serializable> implements FlatMapFunction<Edge<K, EV>, Vertex<K, NullValue>> { public void flatMap(Edge<K, EV> edge,