[FLINK-1201] [gelly] fix ClassCastException and Type errors in mapVertices; 
fixes #41 and #46


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b751df28
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b751df28
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b751df28

Branch: refs/heads/master
Commit: b751df28863af4a6216d967067dbb6f3729b66da
Parents: 3cf734f
Author: vasia <vasilikikala...@gmail.com>
Authored: Tue Jan 6 20:54:25 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 11 10:46:14 2015 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/graph/Graph.java | 51 ++++++++++++++------
 1 file changed, 35 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b751df28/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 03fbf94..1cd5c90 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -106,7 +106,9 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
      * @return a new graph
      */
     public <NV extends Serializable> Graph<K, NV, EV> mapVertices(final 
MapFunction<Vertex<K, VV>, NV> mapper) {
-       DataSet<Vertex<K, NV>> mappedVertices = vertices.map(new 
ApplyMapperToVertexWithType<K, VV, NV>(mapper));
+       TypeInformation<K> keyType = ((TupleTypeInfo<?>) 
vertices.getType()).getTypeAt(0);
+       DataSet<Vertex<K, NV>> mappedVertices = vertices.map(new 
ApplyMapperToVertexWithType<K, VV, NV>(mapper,
+                       keyType));
         return new Graph<K, NV, EV>(mappedVertices, this.getEdges(), 
this.context);
     }
     
@@ -115,19 +117,24 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
                <Vertex<K, VV>, Vertex<K, NV>>, ResultTypeQueryable<Vertex<K, 
NV>> {
        
                private MapFunction<Vertex<K, VV>, NV> innerMapper;
-               public ApplyMapperToVertexWithType(MapFunction<Vertex<K, VV>, 
NV> theMapper) {
+               private transient TypeInformation<K> keyType;
+               public ApplyMapperToVertexWithType(MapFunction<Vertex<K, VV>, 
NV> theMapper, TypeInformation<K> keyType) {
                        this.innerMapper = theMapper;
+                       this.keyType = keyType;
                }
                
                public Vertex<K, NV> map(Vertex<K, VV> value) throws Exception {
                        return new Vertex<K, NV>(value.f0, 
innerMapper.map(value));
                }
        
+               @SuppressWarnings("unchecked")
                @Override
                public TypeInformation<Vertex<K, NV>> getProducedType() {
-                       return new TupleTypeInfo<Vertex<K, NV>>(
-                                       
((TupleTypeInfo<?>)(TypeExtractor.createTypeInfo(MapFunction.class, 
innerMapper.getClass(), 0, null, null))).getTypeAt(0),
-                                       
TypeExtractor.createTypeInfo(MapFunction.class, innerMapper.getClass(), 1, 
null, null));
+                       TypeInformation<NV> valueType = TypeExtractor
+                                       .createTypeInfo(MapFunction.class, 
innerMapper.getClass(), 1, null, null);
+                       @SuppressWarnings("rawtypes")
+                       TypeInformation<?> returnType = new 
TupleTypeInfo<Vertex>(Vertex.class, keyType, valueType);
+                       return (TypeInformation<Vertex<K, NV>>) returnType;
                }
     }
 
@@ -137,7 +144,9 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
      * @return 
      */
     public <NV extends Serializable> Graph<K, VV, NV> mapEdges(final 
MapFunction<Edge<K, EV>, NV> mapper) {
-       DataSet<Edge<K, NV>> mappedEdges = edges.map(new 
ApplyMapperToEdgeWithType<K, EV, NV>(mapper));
+       TypeInformation<K> keyType = ((TupleTypeInfo<?>) 
edges.getType()).getTypeAt(0);
+       DataSet<Edge<K, NV>> mappedEdges = edges.map(new 
ApplyMapperToEdgeWithType<K, EV, NV>(mapper,
+                       keyType));
         return new Graph<K, VV, NV>(this.vertices, mappedEdges, this.context);
     }
     
@@ -146,21 +155,25 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
                <Edge<K, EV>, Edge<K, NV>>, ResultTypeQueryable<Edge<K, NV>> {
        
                private MapFunction<Edge<K, EV>, NV> innerMapper;
+               private transient TypeInformation<K> keyType;
                
-               public ApplyMapperToEdgeWithType(MapFunction<Edge<K, EV>, NV> 
theMapper) {
+               public ApplyMapperToEdgeWithType(MapFunction<Edge<K, EV>, NV> 
theMapper, TypeInformation<K> keyType) {
                        this.innerMapper = theMapper;
+                       this.keyType = keyType;
                }
                
                public Edge<K, NV> map(Edge<K, EV> value) throws Exception {
                        return new Edge<K, NV>(value.f0, value.f1, 
innerMapper.map(value));
                }
        
+               @SuppressWarnings("unchecked")
                @Override
                public TypeInformation<Edge<K, NV>> getProducedType() {
-                       TypeInformation<K> keyType = ((TupleTypeInfo<?>)
-                                       
(TypeExtractor.createTypeInfo(MapFunction.class, innerMapper.getClass(), 0, 
null, null))).getTypeAt(0);
-                       return new TupleTypeInfo<Edge<K, NV>>(keyType, keyType,
-                                       
TypeExtractor.createTypeInfo(MapFunction.class, innerMapper.getClass(), 1, 
null, null));
+                       TypeInformation<NV> valueType = TypeExtractor
+                                       .createTypeInfo(MapFunction.class, 
innerMapper.getClass(), 1, null, null);
+                       @SuppressWarnings("rawtypes")
+                       TypeInformation<?> returnType = new 
TupleTypeInfo<Edge>(Edge.class, keyType, keyType, valueType);
+                       return (TypeInformation<Edge<K, NV>>) returnType;
                        }
     }
 
@@ -604,9 +617,10 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
        public static <K extends Comparable<K> & Serializable, VV extends 
Serializable, EV extends Serializable> 
                Graph<K, VV, EV> create(DataSet<Edge<K, EV>> edges, final 
MapFunction<K, VV> mapper, 
                                ExecutionEnvironment context) {
+               TypeInformation<K> keyType = ((TupleTypeInfo<?>) 
edges.getType()).getTypeAt(0);
                DataSet<Vertex<K, VV>> vertices = 
                                edges.flatMap(new EmitSrcAndTargetAsTuple1<K, 
EV>())
-                               .distinct().map(new 
ApplyMapperToVertexValuesWithType<K, VV>(mapper));
+                               .distinct().map(new 
ApplyMapperToVertexValuesWithType<K, VV>(mapper, keyType));
                return new Graph<K, VV, EV>(vertices, edges, context);
        }
        
@@ -615,20 +629,25 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
                <Tuple1<K>, Vertex<K, VV>>, ResultTypeQueryable<Vertex<K, VV>> {
 
                private MapFunction<K, VV> innerMapper;
+               private transient TypeInformation<K> keyType;
 
-               public ApplyMapperToVertexValuesWithType(MapFunction<K, VV> 
theMapper) {
+               public ApplyMapperToVertexValuesWithType(MapFunction<K, VV> 
theMapper, TypeInformation<K> keyType) {
                        this.innerMapper = theMapper;
+                       this.keyType = keyType;
                }
 
                public Vertex<K, VV> map(Tuple1<K> value) throws Exception {
                        return new Vertex<K, VV>(value.f0, 
innerMapper.map(value.f0));
                }
 
+               @SuppressWarnings("unchecked")
                @Override
                public TypeInformation<Vertex<K, VV>> getProducedType() {
-                       return new TupleTypeInfo<Vertex<K, VV>>(
-                                       
TypeExtractor.createTypeInfo(MapFunction.class, innerMapper.getClass(), 0, 
null, null),
-                                       
TypeExtractor.createTypeInfo(MapFunction.class, innerMapper.getClass(), 1, 
null, null));
+                       TypeInformation<VV> valueType = TypeExtractor
+                                       .createTypeInfo(MapFunction.class, 
innerMapper.getClass(), 1, null, null);
+                       @SuppressWarnings("rawtypes")
+                       TypeInformation<?> returnType = new 
TupleTypeInfo<Vertex>(Vertex.class, keyType, valueType);
+                       return (TypeInformation<Vertex<K, VV>>) returnType;
                }
        }
        

Reply via email to