Hi Biplob, no version 0.10.1 and 1.0-SNAPSHOT are different. Could you bump your Flink version to the latter and try again if you can reproduce your problem?
Cheers, Till On Mon, Jan 18, 2016 at 2:24 PM, Biplob Biswas <revolutioni...@gmail.com> wrote: > Hi Till, > > I am using flink 0.10.1 and if i am not wrong it corresponds to the > 1.0-Snapshot you mentioned. > > [image: Inline image 1] > > If wrong, please suggest what should I do to fix it. > > Thanks & Regards > Biplob Biswas > > On Mon, Jan 18, 2016 at 11:23 AM, Till Rohrmann <trohrm...@apache.org> > wrote: > >> Hi Biplob, >> >> which version of Flink are you using? With version 1.0-SNAPSHOT, I >> cannot reproduce your problem. >> >> Cheers, >> Till >> >> >> On Sun, Jan 17, 2016 at 4:56 PM, Biplob Biswas <revolutioni...@gmail.com> >> wrote: >> >>> Hi, >>> >>> I am getting the following exception when i am using the map function >>> >>> Exception in thread "main" >>>> org.apache.flink.api.common.functions.InvalidTypesException: The return >>>> type of function 'computeWeightedDistribution(GraphWeighted.java:73)' could >>>> not be determined automatically, due to type erasure. You can give type >>>> information hints by using the returns(...) method on the result of the >>>> transformation call, or by letting your function implement the >>>> 'ResultTypeQueryable' interface. >>>> at org.apache.flink.api.java.DataSet.getType(DataSet.java:176) >>>> at org.apache.flink.api.java.DataSet.groupBy(DataSet.java:692) >>>> at aim3.GraphWeighted.computeWeightedDistribution(GraphWeighted.java:74) >>>> at aim3.SlashdotZooInDegree.main(SlashdotZooInDegree.java:39) >>>> Caused by: org.apache.flink.api.common.functions.InvalidTypesException: >>>> Input mismatch: Basic type 'Integer' expected but was 'Long'. >>>> at >>>> org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:767) >>>> at >>>> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:276) >>>> at >>>> org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:110) >>>> at org.apache.flink.api.java.DataSet.map(DataSet.java:213) >>>> at aim3.GraphWeighted.computeWeightedDistribution(GraphWeighted.java:73) >>>> ... 1 more >>> >>> >>> >>> This is the part of the code which I am trying to run : >>> >>> DataSet<Tuple2<String, Long>> distinctVertex = sourceVertex >>>> .union(destinationVertex) >>>> .groupBy(0) >>>> .aggregate(Aggregations.SUM, 1); >>>> // Compute the degrees (degree, count) >>>> >>>> DataSet<Tuple2<Long, Integer>> degreeCount = distinctVertex >>>> .map(new DegreeMapper()) >>>> .groupBy(0) >>>> .aggregate(Aggregations.SUM, 1); >>> >>> >>> >>> and the error I am getting is at this line *.map(new DegreeMapper())* >>> >>> Also, the degree mapper is a simply map function which emits the second >>> column and 1 as follows: >>> >>>> >>>> public static class DegreeMapper implements >>>> MapFunction<Tuple2<String, Long>, Tuple2<Long, Integer>> { >>>> private static final long serialVersionUID = 1L; >>>> public Tuple2<Long, Integer> map(Tuple2<String, Long> input) throws >>>> Exception { >>>> return new Tuple2<Long, Integer>(input.f1, 1); >>>> } >>>> } >>> >>> >>> >>> Now I am lost as to what I did wrong and why I am getting that error, >>> any help would be appreciated. >>> >>> Thanks a lot. >>> >>> Thanks & Regards >>> Biplob Biswas >>> >> >> >