Hi all, I have tried to use lambda expression in spark task, and it throws "java.lang.IllegalArgumentException: Invalid lambda deserialization" exception. This exception is thrown when the is code like "transform(pRDD->pRDD.map(t->t._2))" . The code snippet is below.
JavaPairDStream<String,Integer> aggregate = pairRDD.reduceByKey((x,y)->x+y);JavaDStream<Integer> con = aggregate.transform((Function<JavaPairRDD<String,Integer>, JavaRDD<Integer>>)pRDD-> pRDD.map( (Function<Tuple2<String,Integer>,Integer>)t->t._2)); JavaPairDStream<String,Integer> aggregate = pairRDD.reduceByKey((x,y)->x+y);JavaDStream<Integer> con = aggregate.transform((Function<JavaPairRDD<String,Integer>, JavaRDD<Integer>> & Serializable)pRDD-> pRDD.map( (Function<Tuple2<String,Integer>,Integer> & Serializable)t->t._2)); The above two options didn't worked. Where as if I pass below object "f" as the argument instead of lambda expression"t->t_.2". It works. Function f = new Function<Tuple2<String,Integer>,Integer>(){@Overridepublic Integer call(Tuple2<String,Integer> paramT1) throws Exception {return paramT1._2;}}; May I know what is the right format to express that functions as a lambda expression. public static void main(String[] args) { Function f = new Function<Tuple2<String,Integer>,Integer>(){ @Override public Integer call(Tuple2<String,Integer> paramT1) throws Exception { return paramT1._2; } }; JavaStreamingContext ssc = JavaStreamingFactory.getInstance(); JavaReceiverInputDStream<String> lines = ssc.socketTextStream("localhost", 9999); JavaDStream<String> words = lines.flatMap(s->{return Arrays.asList(s.split(" "));}); JavaPairDStream<String,Integer> pairRDD = words.mapToPair(x->new Tuple2<String,Integer>(x,1)); JavaPairDStream<String,Integer> aggregate = pairRDD.reduceByKey((x,y)->x+y); JavaDStream<Integer> con = aggregate.transform( (Function<JavaPairRDD<String,Integer>, JavaRDD<Integer>>)pRDD-> pRDD.map( (Function<Tuple2<String,Integer>,Integer>)t->t._2)); //JavaDStream<Integer> con = aggregate.transform(pRDD-> pRDD.map(f)); It works con.print(); ssc.start(); ssc.awaitTermination(); }