Hi all,

I want to load an InputDStream from a checkkpoint, but I doesn't work, and
after trying several things I have finally run out of ideas.

So, here's what I do:

1. I create the streaming context - or load it from the checkpoint directory.

  def main(args: Array[String]) {
    val ssc = StreamingContext.getOrCreate("files/checkpoint",
createStreamingContext _)
    ssc.start()
    ssc.awaitTermination()
  }

2. In the function createStreamingContext(), I first create a new Spark
config...

  def createStreamingContext(): StreamingContext = {
    println("New Context")

    val conf = new SparkConf()
      .setMaster("local[2]")
      .setAppName("CheckpointTest")
      .set("spark.streaming.kafka.maxRatePerPartition", "10000")

//...then I create the streaming context...
    val ssc = new StreamingContext(conf, Seconds(1))

    var offsetRanges = Array[OffsetRange]()
    val kafkaParams = Map("metadata.broker.list" ->
"sandbox.hortonworks.com:6667",
      "auto.offset.reset" -> "smallest") //Start from beginning
    val kafkaTopics = Set("Bla")

//...then I go and get a DStream from Kafka...
    val directKafkaStream = KafkaUtils.createDirectStream[String,
Array[Byte], StringDecoder, DefaultDecoder](ssc,
        kafkaParams, kafkaTopics)

//...I do stuff with the DStream
    ...

//...and finally I checkpoint the streaming context and return it
    ssc.checkpoint("files/checkpoint")
    ssc
}

3. When I start the application, after a while it creates in
files/checkpoint/ an empty directory with a name like
23207ed2-c021-4a1d-8af8-0620a19a8665. But that's all, no more files or
directories or whatever appear there.

4. When I stop the application and restart it, it creates a new streaming
context each time. (This also means it starts the Kafka streaming from the
smallest available offset again and again. The main reason for using
checkpoints for me was to not having to keep track of Kafka offsets.)

So, what am I doing wrong?

Thanks a lot!

Kathi

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to