I'm using SparkStreaming and I want to configure checkpoint to manage fault-tolerance. I've been reading the documentation. Is it necessary to create and configure the InputDSStream in the getOrCreate function?
I checked the example in https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala and it looks like it does everything inside of the function. Should I put all the logic of the application inside on it?? I think that that's not the way... If I just create the context I got an error: Exception in thread "main" org.apache.spark.SparkException: org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1e12a5a6 has not been initialized at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266) at org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:51) I'm not pretty good with Scala.. the code that I did def functionToCreateContext(): StreamingContext = { val sparkConf = new SparkConf().setMaster("local[2]").setAppName("app") val ssc = new StreamingContext(sparkConf, Seconds(5)) // new context ssc.checkpoint("/tmp/spark/metricsCheckpoint") // set checkpoint directory ssc } val ssc = StreamingContext.getOrCreate("/tmp/spark/metricsCheckpoint", functionToCreateContext _) val kafkaParams = Map[String, String]("metadata.broker.list" -> args(0)) val topics = args(1).split("\\,") val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics.toSet) directKafkaStream.foreachRDD { rdd => ...