Any help on this?

On Thu, Aug 31, 2017 at 10:30 AM, ayan guha <guha.a...@gmail.com> wrote:

> Hi
>
> Want to understand a basic issue. Here is my code:
>
> def createStreamingContext(sparkCheckpointDir: String,batchDuration: Int
> ) = {
>
> val ssc = new StreamingContext(spark.sparkContext, Seconds(batchDuration))
>
> ssc
> }
>
> val ssc = StreamingContext.getOrCreate(sparkCheckpointDir, () =>
> createStreamingContext(sparkCheckpointDir, batchDuration))
>
>
> val inputDirectStream = 
> EventHubsUtils.createDirectStreams(ssc,namespace,progressDir,Map(name
> -> eventhubParameters)).map(receivedRecord => new
> String(receivedRecord.getBody))
>
>
> inputDirectStream.foreachRDD { (rdd: RDD[String], time: Time) =>
> val df = spark.read.json(rdd)
> df.show(truncate=false)
>
> }
>
> ssc.start()
> ssc.awaitTermination()
>
> the above code works, and I can see the DF.
>
> The issue is: If I enable check point by
>
> def createStreamingContext(sparkCheckpointDir: String,batchDuration: Int
> ) = {
>
> val ssc = new StreamingContext(spark.sparkContext, Seconds(batchDuration))
> ssc.checkpoint(sparkCheckpointDir)
> ssc
> }
>
>
>
> Then the ssc.start() fails with "
> DStream checkpointing has been enabled but the DStreams with their
> functions are not serializable
> <https://stackoverflow.com/questions/40337664/dstream-checkpointing-has-been-enabled-but-the-dstreams-with-their-functions-are>
> "
>
> What I am doing wrong? I want to process DF with checkpoint enabled. Any
> luck?
>
> --
> Best Regards,
> Ayan Guha
>



-- 
Best Regards,
Ayan Guha

Reply via email to