[FLINK-1201] [gelly] use type hints for mapVertices, mapEdges and create

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1d6a2001
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1d6a2001
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1d6a2001

Branch: refs/heads/master
Commit: 1d6a2001abafa69d9acba3a6689b375402f851e1
Parents: 40457c2
Author: vasia <vasilikikala...@gmail.com>
Authored: Thu Jan 15 18:14:39 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 11 10:46:14 2015 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/graph/Graph.java | 144 +++++++------------
 1 file changed, 53 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1d6a2001/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 1990f26..fb72a20 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -57,8 +57,7 @@ import flink.graphs.utils.Tuple2ToVertexMap;
 import flink.graphs.validation.GraphValidator;
 
 @SuppressWarnings("serial")
-public class Graph<K extends Comparable<K> & Serializable, VV extends 
Serializable,
-       EV extends Serializable> implements Serializable {
+public class Graph<K extends Comparable<K> & Serializable, VV extends 
Serializable,    EV extends Serializable> {
 
     private final ExecutionEnvironment context;
        private final DataSet<Vertex<K, VV>> vertices;
@@ -104,76 +103,52 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
      * @param mapper
      * @return a new graph
      */
-    public <NV extends Serializable> Graph<K, NV, EV> mapVertices(final 
MapFunction<Vertex<K, VV>, NV> mapper) {
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+       public <NV extends Serializable> Graph<K, NV, EV> mapVertices(final 
MapFunction<Vertex<K, VV>, NV> mapper) {
+
        TypeInformation<K> keyType = ((TupleTypeInfo<?>) 
vertices.getType()).getTypeAt(0);
-       DataSet<Vertex<K, NV>> mappedVertices = vertices.map(new 
ApplyMapperToVertexWithType<K, VV, NV>(mapper,
-                       keyType));
+
+       TypeInformation<NV> valueType = TypeExtractor
+                               .createTypeInfo(MapFunction.class, 
mapper.getClass(), 1, null, null);
+
+               TypeInformation<Vertex<K, NV>> returnType = 
(TypeInformation<Vertex<K, NV>>)
+                                       new TupleTypeInfo(Vertex.class, 
keyType, valueType);
+
+       DataSet<Vertex<K, NV>> mappedVertices = vertices
+                       .map(new MapFunction<Vertex<K,VV>, Vertex<K, NV>>() {
+                               public Vertex<K, NV> map(Vertex<K, VV> value) 
throws Exception {
+                                       return new Vertex<K, NV>(value.f0, 
mapper.map(value));
+                               }
+                               })
+                       .returns(returnType);
+
         return new Graph<K, NV, EV>(mappedVertices, this.getEdges(), 
this.context);
     }
     
-    private static final class ApplyMapperToVertexWithType<K extends 
Comparable<K> & Serializable, 
-       VV extends Serializable, NV extends Serializable> implements MapFunction
-               <Vertex<K, VV>, Vertex<K, NV>>, ResultTypeQueryable<Vertex<K, 
NV>> {
-       
-               private MapFunction<Vertex<K, VV>, NV> innerMapper;
-               private transient TypeInformation<K> keyType;
-               public ApplyMapperToVertexWithType(MapFunction<Vertex<K, VV>, 
NV> theMapper, TypeInformation<K> keyType) {
-                       this.innerMapper = theMapper;
-                       this.keyType = keyType;
-               }
-               
-               public Vertex<K, NV> map(Vertex<K, VV> value) throws Exception {
-                       return new Vertex<K, NV>(value.f0, 
innerMapper.map(value));
-               }
-       
-               @SuppressWarnings("unchecked")
-               @Override
-               public TypeInformation<Vertex<K, NV>> getProducedType() {
-                       TypeInformation<NV> valueType = TypeExtractor
-                                       .createTypeInfo(MapFunction.class, 
innerMapper.getClass(), 1, null, null);
-                       @SuppressWarnings("rawtypes")
-                       TypeInformation<?> returnType = new 
TupleTypeInfo<Vertex>(Vertex.class, keyType, valueType);
-                       return (TypeInformation<Vertex<K, NV>>) returnType;
-               }
-    }
-
     /**
      * Apply a function to the attribute of each edge in the graph.
      * @param mapper
      * @return 
      */
-    public <NV extends Serializable> Graph<K, VV, NV> mapEdges(final 
MapFunction<Edge<K, EV>, NV> mapper) {
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+       public <NV extends Serializable> Graph<K, VV, NV> mapEdges(final 
MapFunction<Edge<K, EV>, NV> mapper) {
+
        TypeInformation<K> keyType = ((TupleTypeInfo<?>) 
edges.getType()).getTypeAt(0);
-       DataSet<Edge<K, NV>> mappedEdges = edges.map(new 
ApplyMapperToEdgeWithType<K, EV, NV>(mapper,
-                       keyType));
-        return new Graph<K, VV, NV>(this.vertices, mappedEdges, this.context);
-    }
-    
-    private static final class ApplyMapperToEdgeWithType<K extends 
Comparable<K> & Serializable, 
-               EV extends Serializable, NV extends Serializable> implements 
MapFunction
-               <Edge<K, EV>, Edge<K, NV>>, ResultTypeQueryable<Edge<K, NV>> {
-       
-               private MapFunction<Edge<K, EV>, NV> innerMapper;
-               private transient TypeInformation<K> keyType;
-               
-               public ApplyMapperToEdgeWithType(MapFunction<Edge<K, EV>, NV> 
theMapper, TypeInformation<K> keyType) {
-                       this.innerMapper = theMapper;
-                       this.keyType = keyType;
-               }
-               
-               public Edge<K, NV> map(Edge<K, EV> value) throws Exception {
-                       return new Edge<K, NV>(value.f0, value.f1, 
innerMapper.map(value));
-               }
-       
-               @SuppressWarnings("unchecked")
-               @Override
-               public TypeInformation<Edge<K, NV>> getProducedType() {
-                       TypeInformation<NV> valueType = TypeExtractor
-                                       .createTypeInfo(MapFunction.class, 
innerMapper.getClass(), 1, null, null);
-                       @SuppressWarnings("rawtypes")
-                       TypeInformation<?> returnType = new 
TupleTypeInfo<Edge>(Edge.class, keyType, keyType, valueType);
-                       return (TypeInformation<Edge<K, NV>>) returnType;
+
+       TypeInformation<NV> valueType = TypeExtractor
+                               .createTypeInfo(MapFunction.class, 
mapper.getClass(), 1, null, null);
+
+               TypeInformation<Edge<K, NV>> returnType = 
(TypeInformation<Edge<K, NV>>)
+                               new TupleTypeInfo(Edge.class, keyType, keyType, 
valueType);
+
+       DataSet<Edge<K, NV>> mappedEdges = edges.map(new MapFunction<Edge<K, 
EV>, Edge<K, NV>>() {
+                       public Edge<K, NV> map(Edge<K, EV> value) throws 
Exception {
+                               return new Edge<K, NV>(value.f0, value.f1, 
mapper.map(value));
                        }
+               })
+               .returns(returnType);
+
+        return new Graph<K, VV, NV>(this.vertices, mappedEdges, this.context);
     }
 
        /**
@@ -473,6 +448,7 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
         */
        public <T> DataSet<T> reduceOnEdges(EdgesFunctionWithVertexValue<K, VV, 
EV, T> edgesFunction,
                        EdgeDirection direction) throws 
IllegalArgumentException {
+
                switch (direction) {
                case IN:
                        return vertices.coGroup(edges).where(0).equalTo(1).with(
@@ -501,6 +477,7 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
         */
        public <T> DataSet<T> reduceOnEdges(EdgesFunction<K, EV, T> 
edgesFunction,
                        EdgeDirection direction) throws 
IllegalArgumentException {
+
                switch (direction) {
                case IN:
                        return edges.map(new ProjectVertexIdMap<K, EV>(1))
@@ -544,7 +521,6 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
                public void reduce(Iterable<Tuple2<K, Edge<K, EV>>> edges,
                                Collector<T> out) throws Exception {
                        out.collect(function.iterateEdges(edges));
-                       
                }
 
                @Override
@@ -585,6 +561,7 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
                                Iterable<Edge<K, EV>> edges, Collector<T> out) 
throws Exception {
                        
out.collect(function.iterateEdges(vertex.iterator().next(), edges));
                }
+
                @Override
                public TypeInformation<T> getProducedType() {
                        return 
TypeExtractor.createTypeInfo(EdgesFunctionWithVertexValue.class, 
function.getClass(), 3, null, null);
@@ -624,7 +601,7 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
                                keysWithEdgesIterator.remove();
                        }                       
                };
-               
+
                Iterable<Edge<K, EV>> edgesIterable = new 
Iterable<Edge<K,EV>>() {
                        public Iterator<Edge<K, EV>> iterator() {
                                return edgesIterator;
@@ -703,39 +680,24 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
                Graph<K, VV, EV> create(DataSet<Edge<K, EV>> edges, final 
MapFunction<K, VV> mapper, 
                                ExecutionEnvironment context) {
                TypeInformation<K> keyType = ((TupleTypeInfo<?>) 
edges.getType()).getTypeAt(0);
+
+               TypeInformation<VV> valueType = TypeExtractor
+                               .createTypeInfo(MapFunction.class, 
mapper.getClass(), 1, null, null);
+
+               @SuppressWarnings({ "unchecked", "rawtypes" })
+               TypeInformation<Vertex<K, VV>> returnType = 
(TypeInformation<Vertex<K, VV>>)
+                               new TupleTypeInfo(Vertex.class, keyType, 
valueType);
+
                DataSet<Vertex<K, VV>> vertices = 
                                edges.flatMap(new EmitSrcAndTargetAsTuple1<K, 
EV>())
-                               .distinct().map(new 
ApplyMapperToVertexValuesWithType<K, VV>(mapper, keyType));
+                               .distinct().map(new MapFunction<Tuple1<K>, 
Vertex<K, VV>>(){
+                                       public Vertex<K, VV> map(Tuple1<K> 
value) throws Exception {
+                                               return new Vertex<K, 
VV>(value.f0, mapper.map(value.f0));
+                                       }
+                               }).returns(returnType);
                return new Graph<K, VV, EV>(vertices, edges, context);
        }
-       
-       private static final class ApplyMapperToVertexValuesWithType<K extends 
Comparable<K> & Serializable, 
-               VV extends Serializable> implements MapFunction
-               <Tuple1<K>, Vertex<K, VV>>, ResultTypeQueryable<Vertex<K, VV>> {
-
-               private MapFunction<K, VV> innerMapper;
-               private transient TypeInformation<K> keyType;
 
-               public ApplyMapperToVertexValuesWithType(MapFunction<K, VV> 
theMapper, TypeInformation<K> keyType) {
-                       this.innerMapper = theMapper;
-                       this.keyType = keyType;
-               }
-
-               public Vertex<K, VV> map(Tuple1<K> value) throws Exception {
-                       return new Vertex<K, VV>(value.f0, 
innerMapper.map(value.f0));
-               }
-
-               @SuppressWarnings("unchecked")
-               @Override
-               public TypeInformation<Vertex<K, VV>> getProducedType() {
-                       TypeInformation<VV> valueType = TypeExtractor
-                                       .createTypeInfo(MapFunction.class, 
innerMapper.getClass(), 1, null, null);
-                       @SuppressWarnings("rawtypes")
-                       TypeInformation<?> returnType = new 
TupleTypeInfo<Vertex>(Vertex.class, keyType, valueType);
-                       return (TypeInformation<Vertex<K, VV>>) returnType;
-               }
-       }
-       
        private static final class EmitSrcAndTarget<K extends Comparable<K> & 
Serializable, EV extends Serializable>
                implements FlatMapFunction<Edge<K, EV>, Vertex<K, NullValue>> {
                public void flatMap(Edge<K, EV> edge,

Reply via email to