Hi,

I am using spark streaming for streaming data from kafka 0.8

I am using checkpointing in HDFS . I am getting error like below

java.io.NotSerializableException: DStream checkpointing has been enabled
but the DStreams with their functions are not serialisable

field (class:
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1,
name: foreachFunc$1, type: interface
org.apache.spark.api.java.function.Function)
- object (class
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1,
<function1>)
- field (class:
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3,
name: cleanedF$1, type: interface scala.Function1)
- object (class
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3,
<function2>)
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.dstream.ForEachDStream,
org.apache.spark.streaming.dstream.ForEachDStream@333c4112)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 16)
- field (class: scala.collection.mutable.ArrayBuffer, name: array, type:
class [Ljava.lang.Object;)
- object (class scala.collection.mutable.ArrayBuffer,
ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@333c4112))
- writeObject data (class:
org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files

])
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.kafka.KafkaInputDStream,
org.apache.spark.streaming.kafka.KafkaInputDStream@5f989b88)
- writeObject data (class:
org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files

])
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.kafka.KafkaInputDStream,
org.apache.spark.streaming.kafka.KafkaInputDStream@36f6bc85)
- writeObject data (class:
org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files


In my foreachRDD for dstream I use code like below

foreachRDD(
new Function<JavaPairRDD<String, String>, Void>() {

private static final long serialVersionUID = 1L;

@Override
public Void call(JavaPairRDD<String, String> v1)
throws Exception {
Map<String, String> localMap = v1.collectAsMap();
myfunction(localmap)

};

my function writes content of local map to file.


Thanks
Sandesh

Reply via email to