Any help on this? On Thu, Aug 31, 2017 at 10:30 AM, ayan guha <guha.a...@gmail.com> wrote:
> Hi > > Want to understand a basic issue. Here is my code: > > def createStreamingContext(sparkCheckpointDir: String,batchDuration: Int > ) = { > > val ssc = new StreamingContext(spark.sparkContext, Seconds(batchDuration)) > > ssc > } > > val ssc = StreamingContext.getOrCreate(sparkCheckpointDir, () => > createStreamingContext(sparkCheckpointDir, batchDuration)) > > > val inputDirectStream = > EventHubsUtils.createDirectStreams(ssc,namespace,progressDir,Map(name > -> eventhubParameters)).map(receivedRecord => new > String(receivedRecord.getBody)) > > > inputDirectStream.foreachRDD { (rdd: RDD[String], time: Time) => > val df = spark.read.json(rdd) > df.show(truncate=false) > > } > > ssc.start() > ssc.awaitTermination() > > the above code works, and I can see the DF. > > The issue is: If I enable check point by > > def createStreamingContext(sparkCheckpointDir: String,batchDuration: Int > ) = { > > val ssc = new StreamingContext(spark.sparkContext, Seconds(batchDuration)) > ssc.checkpoint(sparkCheckpointDir) > ssc > } > > > > Then the ssc.start() fails with " > DStream checkpointing has been enabled but the DStreams with their > functions are not serializable > <https://stackoverflow.com/questions/40337664/dstream-checkpointing-has-been-enabled-but-the-dstreams-with-their-functions-are> > " > > What I am doing wrong? I want to process DF with checkpoint enabled. Any > luck? > > -- > Best Regards, > Ayan Guha > -- Best Regards, Ayan Guha