Hi

changed code like this but error continues:

myUnionRdd.repartition(sparkNumberOfSlaves).foreachRDD(

new Function<JavaPairRDD<String, String>, Void>() {


private static final long serialVersionUID = 1L;


@Override

public Void call(JavaPairRDD<String, String> v1)

throws Exception {


Map<String, String> localMap = v1.collectAsMap();

long currentTime=System.currentTimeMillis();

String oldfileName="/mnt/intermediate_data/output_"+currentTime+".part";

String fileName="/mnt/intermediate_data/output_"+currentTime+".csv";

PrintWriter writer=null;

try{

 writer = new PrintWriter(oldfileName, "UTF-8");

for (Map.Entry<String, String> entry : localMap

.entrySet()) {

String key = entry.getKey();

String value = entry.getValue();

writer.println(key+","+value);

}

}catch(Exception e)

{

e.printStackTrace();

}

finally

{

if(null!=writer)

writer.close();

File part= new File(oldfileName);

File csv= new File(fileName);

part.renameTo(csv);

}

return null;

}

});

On Thu, Jun 9, 2016 at 11:51 PM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> myFunction() is probably capturing unexpected things in the closure of the
> Function you have defined, because myFunction is defined outside. Try
> defining the myFunction inside the Function and see if the problem persists.
>
> On Thu, Jun 9, 2016 at 3:57 AM, sandesh deshmane <sandesh.v...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I am using spark streaming for streaming data from kafka 0.8
>>
>> I am using checkpointing in HDFS . I am getting error like below
>>
>> java.io.NotSerializableException: DStream checkpointing has been enabled
>> but the DStreams with their functions are not serialisable
>>
>> field (class:
>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1,
>> name: foreachFunc$1, type: interface
>> org.apache.spark.api.java.function.Function)
>> - object (class
>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1,
>> <function1>)
>> - field (class:
>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3,
>> name: cleanedF$1, type: interface scala.Function1)
>> - object (class
>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3,
>> <function2>)
>> - writeObject data (class: org.apache.spark.streaming.dstream.DStream)
>> - object (class org.apache.spark.streaming.dstream.ForEachDStream,
>> org.apache.spark.streaming.dstream.ForEachDStream@333c4112)
>> - element of array (index: 0)
>> - array (class [Ljava.lang.Object;, size 16)
>> - field (class: scala.collection.mutable.ArrayBuffer, name: array, type:
>> class [Ljava.lang.Object;)
>> - object (class scala.collection.mutable.ArrayBuffer,
>> ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@333c4112))
>> - writeObject data (class:
>> org.apache.spark.streaming.dstream.DStreamCheckpointData)
>> - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData,
>> [
>> 0 checkpoint files
>>
>> ])
>> - writeObject data (class: org.apache.spark.streaming.dstream.DStream)
>> - object (class org.apache.spark.streaming.kafka.KafkaInputDStream,
>> org.apache.spark.streaming.kafka.KafkaInputDStream@5f989b88)
>> - writeObject data (class:
>> org.apache.spark.streaming.dstream.DStreamCheckpointData)
>> - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData,
>> [
>> 0 checkpoint files
>>
>> ])
>> - writeObject data (class: org.apache.spark.streaming.dstream.DStream)
>> - object (class org.apache.spark.streaming.kafka.KafkaInputDStream,
>> org.apache.spark.streaming.kafka.KafkaInputDStream@36f6bc85)
>> - writeObject data (class:
>> org.apache.spark.streaming.dstream.DStreamCheckpointData)
>> - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData,
>> [
>> 0 checkpoint files
>>
>>
>> In my foreachRDD for dstream I use code like below
>>
>> foreachRDD(
>> new Function<JavaPairRDD<String, String>, Void>() {
>>
>> private static final long serialVersionUID = 1L;
>>
>> @Override
>> public Void call(JavaPairRDD<String, String> v1)
>> throws Exception {
>> Map<String, String> localMap = v1.collectAsMap();
>> myfunction(localmap)
>>
>> };
>>
>> my function writes content of local map to file.
>>
>>
>> Thanks
>> Sandesh
>>
>
>

Reply via email to