Hi changed code like this but error continues:
myUnionRdd.repartition(sparkNumberOfSlaves).foreachRDD( new Function<JavaPairRDD<String, String>, Void>() { private static final long serialVersionUID = 1L; @Override public Void call(JavaPairRDD<String, String> v1) throws Exception { Map<String, String> localMap = v1.collectAsMap(); long currentTime=System.currentTimeMillis(); String oldfileName="/mnt/intermediate_data/output_"+currentTime+".part"; String fileName="/mnt/intermediate_data/output_"+currentTime+".csv"; PrintWriter writer=null; try{ writer = new PrintWriter(oldfileName, "UTF-8"); for (Map.Entry<String, String> entry : localMap .entrySet()) { String key = entry.getKey(); String value = entry.getValue(); writer.println(key+","+value); } }catch(Exception e) { e.printStackTrace(); } finally { if(null!=writer) writer.close(); File part= new File(oldfileName); File csv= new File(fileName); part.renameTo(csv); } return null; } }); On Thu, Jun 9, 2016 at 11:51 PM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > myFunction() is probably capturing unexpected things in the closure of the > Function you have defined, because myFunction is defined outside. Try > defining the myFunction inside the Function and see if the problem persists. > > On Thu, Jun 9, 2016 at 3:57 AM, sandesh deshmane <sandesh.v...@gmail.com> > wrote: > >> Hi, >> >> I am using spark streaming for streaming data from kafka 0.8 >> >> I am using checkpointing in HDFS . I am getting error like below >> >> java.io.NotSerializableException: DStream checkpointing has been enabled >> but the DStreams with their functions are not serialisable >> >> field (class: >> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1, >> name: foreachFunc$1, type: interface >> org.apache.spark.api.java.function.Function) >> - object (class >> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1, >> <function1>) >> - field (class: >> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, >> name: cleanedF$1, type: interface scala.Function1) >> - object (class >> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, >> <function2>) >> - writeObject data (class: org.apache.spark.streaming.dstream.DStream) >> - object (class org.apache.spark.streaming.dstream.ForEachDStream, >> org.apache.spark.streaming.dstream.ForEachDStream@333c4112) >> - element of array (index: 0) >> - array (class [Ljava.lang.Object;, size 16) >> - field (class: scala.collection.mutable.ArrayBuffer, name: array, type: >> class [Ljava.lang.Object;) >> - object (class scala.collection.mutable.ArrayBuffer, >> ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@333c4112)) >> - writeObject data (class: >> org.apache.spark.streaming.dstream.DStreamCheckpointData) >> - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, >> [ >> 0 checkpoint files >> >> ]) >> - writeObject data (class: org.apache.spark.streaming.dstream.DStream) >> - object (class org.apache.spark.streaming.kafka.KafkaInputDStream, >> org.apache.spark.streaming.kafka.KafkaInputDStream@5f989b88) >> - writeObject data (class: >> org.apache.spark.streaming.dstream.DStreamCheckpointData) >> - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, >> [ >> 0 checkpoint files >> >> ]) >> - writeObject data (class: org.apache.spark.streaming.dstream.DStream) >> - object (class org.apache.spark.streaming.kafka.KafkaInputDStream, >> org.apache.spark.streaming.kafka.KafkaInputDStream@36f6bc85) >> - writeObject data (class: >> org.apache.spark.streaming.dstream.DStreamCheckpointData) >> - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, >> [ >> 0 checkpoint files >> >> >> In my foreachRDD for dstream I use code like below >> >> foreachRDD( >> new Function<JavaPairRDD<String, String>, Void>() { >> >> private static final long serialVersionUID = 1L; >> >> @Override >> public Void call(JavaPairRDD<String, String> v1) >> throws Exception { >> Map<String, String> localMap = v1.collectAsMap(); >> myfunction(localmap) >> >> }; >> >> my function writes content of local map to file. >> >> >> Thanks >> Sandesh >> > >