Hello, everyone.
I develop stream application, working with window functions - each window create table and perform some SQL-operations on extracted data. I met such problem: when using window operations and checkpointing, application does not start next time.
Here is the code:

------------------------------------------------------------------------

finalDuration batchDuration = Durations.seconds(10);
finalDuration slideDuration = Durations.seconds(10);
finalDuration windowDuration = Durations.seconds(600);

finalSparkConf conf =newSparkConf();
conf.setAppName("Streaming");
conf.setMaster("local[4]");


JavaStreamingContextFactory contextFactory =newJavaStreamingContextFactory() {
    @Override
    publicJavaStreamingContext create() {
        JavaStreamingContext streamingContext 
=newJavaStreamingContext(conf,batchDuration);
        streamingContext.checkpoint(CHECKPOINT_DIR);

        returnstreamingContext;
    }
};

JavaStreamingContext streamingContext = 
JavaStreamingContext.getOrCreate(CHECKPOINT_DIR,newConfiguration(), 
contextFactory,true);
JavaDStream<String> lines = streamingContext.textFileStream(SOURCE_DIR);

lines.countByWindow(windowDuration,slideDuration).print();

streamingContext.start();
streamingContext.awaitTermination();

------------------------------------------------------------------------

I expect, that after application restart, Spark will merge old event counter with new values (if it is not so, I am ready to merge old data manually).
But, after application restart, I have this error:
Exception in thread "main" org.apache.spark.SparkException: org.apache.spark.streaming.dstream.MappedDStream@49db6f23 has not been initialized at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
    at scala.Option.orElse(Option.scala:289)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:223) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:218) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:218) at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:89) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512) at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:584)
    at my.package.FileAggregations.main(FileAggregations.java:76)

At FileAggregations.java:76 is

streamingContext.start();

Spark version is 1.3.0.

---
wbr, Alexandr Krasheninnikov

Reply via email to