Yes, you need to follow the documentation. Configure your stream, including the transformations made to it, inside the getOrCreate function.
On Tue, Jul 28, 2015 at 3:14 AM, Guillermo Ortiz <konstt2...@gmail.com> wrote: > I'm using SparkStreaming and I want to configure checkpoint to manage > fault-tolerance. > I've been reading the documentation. Is it necessary to create and > configure the InputDSStream in the getOrCreate function? > > I checked the example in > https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala > and it looks like it does everything inside of the function. Should I put > all the logic of the application inside on it?? I think that that's not the > way... > > If I just create the context I got an error: > Exception in thread "main" org.apache.spark.SparkException: > org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1e12a5a6 has not > been initialized > at > org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266) > at > org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:51) > > > I'm not pretty good with Scala.. the code that I did > def functionToCreateContext(): StreamingContext = { > val sparkConf = new > SparkConf().setMaster("local[2]").setAppName("app") > val ssc = new StreamingContext(sparkConf, Seconds(5)) // new context > > ssc.checkpoint("/tmp/spark/metricsCheckpoint") // set checkpoint > directory > ssc > } > > > val ssc = StreamingContext.getOrCreate("/tmp/spark/metricsCheckpoint", > functionToCreateContext _) > val kafkaParams = Map[String, String]("metadata.broker.list" -> > args(0)) > val topics = args(1).split("\\,") > val directKafkaStream = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder](ssc, kafkaParams, topics.toSet) > > directKafkaStream.foreachRDD { rdd => ... > >