Hello, everyone.
I develop stream application, working with window functions - each
window create table and perform some SQL-operations on extracted data.
I met such problem: when using window operations and checkpointing,
application does not start next time.
Here is the code:
------------------------------------------------------------------------
finalDuration batchDuration = Durations.seconds(10);
finalDuration slideDuration = Durations.seconds(10);
finalDuration windowDuration = Durations.seconds(600);
finalSparkConf conf =newSparkConf();
conf.setAppName("Streaming");
conf.setMaster("local[4]");
JavaStreamingContextFactory contextFactory =newJavaStreamingContextFactory() {
@Override
publicJavaStreamingContext create() {
JavaStreamingContext streamingContext
=newJavaStreamingContext(conf,batchDuration);
streamingContext.checkpoint(CHECKPOINT_DIR);
returnstreamingContext;
}
};
JavaStreamingContext streamingContext =
JavaStreamingContext.getOrCreate(CHECKPOINT_DIR,newConfiguration(),
contextFactory,true);
JavaDStream<String> lines = streamingContext.textFileStream(SOURCE_DIR);
lines.countByWindow(windowDuration,slideDuration).print();
streamingContext.start();
streamingContext.awaitTermination();
------------------------------------------------------------------------
I expect, that after application restart, Spark will merge old event
counter with new values (if it is not so, I am ready to merge old data
manually).
But, after application restart, I have this error:
Exception in thread "main" org.apache.spark.SparkException:
org.apache.spark.streaming.dstream.MappedDStream@49db6f23 has not been
initialized
at
org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
at scala.Option.orElse(Option.scala:289)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:223)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:218)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at
org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:218)
at
org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:89)
at
org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67)
at
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512)
at
org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:584)
at my.package.FileAggregations.main(FileAggregations.java:76)
At FileAggregations.java:76 is
streamingContext.start();
Spark version is 1.3.0.
---
wbr, Alexandr Krasheninnikov