Hi, I am using spark streaming for streaming data from kafka 0.8
I am using checkpointing in HDFS . I am getting error like below java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serialisable field (class: org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1, name: foreachFunc$1, type: interface org.apache.spark.api.java.function.Function) - object (class org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1, <function1>) - field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, name: cleanedF$1, type: interface scala.Function1) - object (class org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, <function2>) - writeObject data (class: org.apache.spark.streaming.dstream.DStream) - object (class org.apache.spark.streaming.dstream.ForEachDStream, org.apache.spark.streaming.dstream.ForEachDStream@333c4112) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 16) - field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;) - object (class scala.collection.mutable.ArrayBuffer, ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@333c4112)) - writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData) - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [ 0 checkpoint files ]) - writeObject data (class: org.apache.spark.streaming.dstream.DStream) - object (class org.apache.spark.streaming.kafka.KafkaInputDStream, org.apache.spark.streaming.kafka.KafkaInputDStream@5f989b88) - writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData) - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [ 0 checkpoint files ]) - writeObject data (class: org.apache.spark.streaming.dstream.DStream) - object (class org.apache.spark.streaming.kafka.KafkaInputDStream, org.apache.spark.streaming.kafka.KafkaInputDStream@36f6bc85) - writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData) - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [ 0 checkpoint files In my foreachRDD for dstream I use code like below foreachRDD( new Function<JavaPairRDD<String, String>, Void>() { private static final long serialVersionUID = 1L; @Override public Void call(JavaPairRDD<String, String> v1) throws Exception { Map<String, String> localMap = v1.collectAsMap(); myfunction(localmap) }; my function writes content of local map to file. Thanks Sandesh