myFunction() is probably capturing unexpected things in the closure of the
Function you have defined, because myFunction is defined outside. Try
defining the myFunction inside the Function and see if the problem persists.

On Thu, Jun 9, 2016 at 3:57 AM, sandesh deshmane <sandesh.v...@gmail.com>
wrote:

> Hi,
>
> I am using spark streaming for streaming data from kafka 0.8
>
> I am using checkpointing in HDFS . I am getting error like below
>
> java.io.NotSerializableException: DStream checkpointing has been enabled
> but the DStreams with their functions are not serialisable
>
> field (class:
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1,
> name: foreachFunc$1, type: interface
> org.apache.spark.api.java.function.Function)
> - object (class
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1,
> <function1>)
> - field (class:
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3,
> name: cleanedF$1, type: interface scala.Function1)
> - object (class
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3,
> <function2>)
> - writeObject data (class: org.apache.spark.streaming.dstream.DStream)
> - object (class org.apache.spark.streaming.dstream.ForEachDStream,
> org.apache.spark.streaming.dstream.ForEachDStream@333c4112)
> - element of array (index: 0)
> - array (class [Ljava.lang.Object;, size 16)
> - field (class: scala.collection.mutable.ArrayBuffer, name: array, type:
> class [Ljava.lang.Object;)
> - object (class scala.collection.mutable.ArrayBuffer,
> ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@333c4112))
> - writeObject data (class:
> org.apache.spark.streaming.dstream.DStreamCheckpointData)
> - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
> 0 checkpoint files
>
> ])
> - writeObject data (class: org.apache.spark.streaming.dstream.DStream)
> - object (class org.apache.spark.streaming.kafka.KafkaInputDStream,
> org.apache.spark.streaming.kafka.KafkaInputDStream@5f989b88)
> - writeObject data (class:
> org.apache.spark.streaming.dstream.DStreamCheckpointData)
> - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
> 0 checkpoint files
>
> ])
> - writeObject data (class: org.apache.spark.streaming.dstream.DStream)
> - object (class org.apache.spark.streaming.kafka.KafkaInputDStream,
> org.apache.spark.streaming.kafka.KafkaInputDStream@36f6bc85)
> - writeObject data (class:
> org.apache.spark.streaming.dstream.DStreamCheckpointData)
> - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
> 0 checkpoint files
>
>
> In my foreachRDD for dstream I use code like below
>
> foreachRDD(
> new Function<JavaPairRDD<String, String>, Void>() {
>
> private static final long serialVersionUID = 1L;
>
> @Override
> public Void call(JavaPairRDD<String, String> v1)
> throws Exception {
> Map<String, String> localMap = v1.collectAsMap();
> myfunction(localmap)
>
> };
>
> my function writes content of local map to file.
>
>
> Thanks
> Sandesh
>

Reply via email to