In a Spark Streaming sample code I am trying to implicitly convert an RDD to DS
and save to permanent storage. Below is the snippet of the code I am trying to
run. The job runs fine first time when started with the checkpoint directory
empty. However, if I kill and restart the job with the same checkpoint
directory I get the following error resulting in job failure:
16/10/07 23:42:50 ERROR JobScheduler: Error running job streaming job
1475883550000 ms.0
java.lang.NullPointerException
at org.apache.spark.sql.SQLImplicits.rddToDatasetHolder(SQLImplicits.scala:163)
at
com.microsoft.spark.streaming.examples.workloads.EventhubsToAzureBlobAsJSON$$anonfun$createStreamingContext$2.apply(EventhubsToAzureBlobAsJSON.scala:72)
at
com.microsoft.spark.streaming.examples.workloads.EventhubsToAzureBlobAsJSON$$anonfun$createStreamingContext$2.apply(EventhubsToAzureBlobAsJSON.scala:72)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:245)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:244)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/10/07 23:42:50 INFO SparkContext: Starting job: print at
EventhubsToAzureBlobAsJSON.scala:93
Does anyone have any sample recoverable Spark Streaming code using Spark
Session constructs of 2.0?
object EventhubsToAzureBlobAsJSON {
def createStreamingContext(inputOptions: ArgumentMap): StreamingContext = {
.....
val sparkSession : SparkSession =
SparkSession.builder.config(sparkConfiguration).getOrCreate
import sparkSession.implicits._
val streamingContext = new StreamingContext(sparkSession.sparkContext,
Seconds(inputOptions(Symbol(EventhubsArgumentKeys.BatchIntervalInSeconds)).asInstanceOf[Int]))
streamingContext.checkpoint(inputOptions(Symbol(EventhubsArgumentKeys.CheckpointDirectory)).asInstanceOf[String])
val eventHubsStream = EventHubsUtils.createUnionStream(streamingContext,
eventHubsParameters)
val eventHubsWindowedStream = eventHubsStream
.window(Seconds(inputOptions(Symbol(EventhubsArgumentKeys.BatchIntervalInSeconds)).asInstanceOf[Int]))
/**
* This fails on restart
*/
eventHubsWindowedStream.map(x => EventContent(new String(x)))
.foreachRDD(rdd => rdd.toDS.toJSON.write.mode(SaveMode.Overwrite)
.save(inputOptions(Symbol(EventhubsArgumentKeys.EventStoreFolder))
.asInstanceOf[String]))
/**
* This runs fine on restart
*/
/*
eventHubsWindowedStream.map(x => EventContent(new String(x)))
.foreachRDD(rdd =>
rdd.saveAsTextFile(inputOptions(Symbol(EventhubsArgumentKeys.EventStoreFolder))
.asInstanceOf[String], classOf[GzipCodec]))
*/
.....
}
def main(inputArguments: Array[String]): Unit = {
val inputOptions = EventhubsArgumentParser.parseArguments(Map(),
inputArguments.toList)
EventhubsArgumentParser.verifyEventhubsToAzureBlobAsJSONArguments(inputOptions)
//Create or recreate streaming context
val streamingContext = StreamingContext
.getOrCreate(inputOptions(Symbol(EventhubsArgumentKeys.CheckpointDirectory)).asInstanceOf[String],
() => createStreamingContext(inputOptions))
streamingContext.start()
if(inputOptions.contains(Symbol(EventhubsArgumentKeys.TimeoutInMinutes))) {
streamingContext.awaitTerminationOrTimeout(inputOptions(Symbol(EventhubsArgumentKeys.TimeoutInMinutes))
.asInstanceOf[Long] * 60 * 1000)
}
else {
streamingContext.awaitTermination()
}
}
}
Thanks, Arijit