I believe you’ll have to use another way of creating StreamingContext by 
passing create function in getOrCreate function.

private def setupSparkContext(): StreamingContext = {
  val streamingSparkContext = {
    val sparkConf = new 
SparkConf().setAppName(config.appName).setMaster(config.master)
    new StreamingContext(sparkConf, config.batchInterval)
  }
  streamingSparkContext.checkpoint(config.checkpointDir)
  streamingSparkContext
}

….
val ssc = StreamingContext.getOrCreate(config.checkpointDir, setupSparkContext)

Javadoc for getOrCreate

/**
 * Either recreate a StreamingContext from checkpoint data or create a new 
StreamingContext.
 * If checkpoint data exists in the provided `checkpointPath`, then 
StreamingContext will be
 * recreated from the checkpoint data. If the data does not exist, then the 
StreamingContext
 * will be created by called the provided `creatingFunc`.
 *
 * @param checkpointPath Checkpoint directory used in an earlier 
StreamingContext program
 * @param creatingFunc   Function to create a new StreamingContext
 * @param hadoopConf     Optional Hadoop configuration if necessary for reading 
from the
 *                       file system
 * @param createOnError  Optional, whether to create a new StreamingContext if 
there is an
 *                       error in reading checkpoint data. By default, an 
exception will be
 *                       thrown on error.
 */

Hope this helps!

SM



> On 06-Nov-2015, at 8:19 PM, Cody Koeninger <c...@koeninger.org> wrote:
> 
> Have you looked at the driver and executor logs?
> 
> Without being able to see what's in the "do stuff with the dstream" section 
> of code... I'd suggest starting with a simpler job, e.g that does nothing but 
> print each message, and verify whether it checkpoints
> 
> On Fri, Nov 6, 2015 at 3:59 AM, Kathi Stutz <em...@kathistutz.de 
> <mailto:em...@kathistutz.de>> wrote:
> Hi all,
> 
> I want to load an InputDStream from a checkkpoint, but I doesn't work, and
> after trying several things I have finally run out of ideas.
> 
> So, here's what I do:
> 
> 1. I create the streaming context - or load it from the checkpoint directory.
> 
>   def main(args: Array[String]) {
>     val ssc = StreamingContext.getOrCreate("files/checkpoint",
> createStreamingContext _)
>     ssc.start()
>     ssc.awaitTermination()
>   }
> 
> 2. In the function createStreamingContext(), I first create a new Spark
> config...
> 
>   def createStreamingContext(): StreamingContext = {
>     println("New Context")
> 
>     val conf = new SparkConf()
>       .setMaster("local[2]")
>       .setAppName("CheckpointTest")
>       .set("spark.streaming.kafka.maxRatePerPartition", "10000")
> 
> //...then I create the streaming context...
>     val ssc = new StreamingContext(conf, Seconds(1))
> 
>     var offsetRanges = Array[OffsetRange]()
>     val kafkaParams = Map("metadata.broker.list" ->
> "sandbox.hortonworks.com:6667 <http://sandbox.hortonworks.com:6667/>",
>       "auto.offset.reset" -> "smallest") //Start from beginning
>     val kafkaTopics = Set("Bla")
> 
> //...then I go and get a DStream from Kafka...
>     val directKafkaStream = KafkaUtils.createDirectStream[String,
> Array[Byte], StringDecoder, DefaultDecoder](ssc,
>         kafkaParams, kafkaTopics)
> 
> //...I do stuff with the DStream
>     ...
> 
> //...and finally I checkpoint the streaming context and return it
>     ssc.checkpoint("files/checkpoint")
>     ssc
> }
> 
> 3. When I start the application, after a while it creates in
> files/checkpoint/ an empty directory with a name like
> 23207ed2-c021-4a1d-8af8-0620a19a8665. But that's all, no more files or
> directories or whatever appear there.
> 
> 4. When I stop the application and restart it, it creates a new streaming
> context each time. (This also means it starts the Kafka streaming from the
> smallest available offset again and again. The main reason for using
> checkpoints for me was to not having to keep track of Kafka offsets.)
> 
> So, what am I doing wrong?
> 
> Thanks a lot!
> 
> Kathi
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> For additional commands, e-mail: user-h...@spark.apache.org 
> <mailto:user-h...@spark.apache.org>
> 
> 

Reply via email to