Hi all, I want to load an InputDStream from a checkkpoint, but I doesn't work, and after trying several things I have finally run out of ideas.
So, here's what I do: 1. I create the streaming context - or load it from the checkpoint directory. def main(args: Array[String]) { val ssc = StreamingContext.getOrCreate("files/checkpoint", createStreamingContext _) ssc.start() ssc.awaitTermination() } 2. In the function createStreamingContext(), I first create a new Spark config... def createStreamingContext(): StreamingContext = { println("New Context") val conf = new SparkConf() .setMaster("local[2]") .setAppName("CheckpointTest") .set("spark.streaming.kafka.maxRatePerPartition", "10000") //...then I create the streaming context... val ssc = new StreamingContext(conf, Seconds(1)) var offsetRanges = Array[OffsetRange]() val kafkaParams = Map("metadata.broker.list" -> "sandbox.hortonworks.com:6667", "auto.offset.reset" -> "smallest") //Start from beginning val kafkaTopics = Set("Bla") //...then I go and get a DStream from Kafka... val directKafkaStream = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, kafkaTopics) //...I do stuff with the DStream ... //...and finally I checkpoint the streaming context and return it ssc.checkpoint("files/checkpoint") ssc } 3. When I start the application, after a while it creates in files/checkpoint/ an empty directory with a name like 23207ed2-c021-4a1d-8af8-0620a19a8665. But that's all, no more files or directories or whatever appear there. 4. When I stop the application and restart it, it creates a new streaming context each time. (This also means it starts the Kafka streaming from the smallest available offset again and again. The main reason for using checkpoints for me was to not having to keep track of Kafka offsets.) So, what am I doing wrong? Thanks a lot! Kathi --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org