Hello, I have a Spark Streaming application (that uses Spark 1.2.1) that listens to an input directory, and when new JSON files are copied to that directory processes them, and writes them to an output directory.
It uses a 3rd party library to process the multi-line JSON files ( https://github.com/alexholmes/json-mapreduce). You can see the relevant part of the streaming application at: https://gist.github.com/emres/ec18ee264e4eb0dd8f1a When I run this application locally, it works perfectly fine. But then I wanted to test whether it could recover from failure, e.g. if I stopped it right in the middle of processing some files. I started the streaming application, copied 100 files to the input directory, and hit Ctrl+C when it has alread processed about 50 files: ... 2015-03-03 15:06:20 INFO FileInputFormat:280 - Total input paths to process : 1 2015-03-03 15:06:20 INFO FileInputFormat:280 - Total input paths to process : 1 2015-03-03 15:06:20 INFO FileInputFormat:280 - Total input paths to process : 1 [Stage 0:==========================================================================================================================> (65 + 4) / 100] ^C Then I started the application again, expecting that it could recover from the checkpoint. For a while it started to read files again and then gave an exception: ... 2015-03-03 15:06:39 INFO FileInputFormat:280 - Total input paths to process : 1 2015-03-03 15:06:39 INFO FileInputFormat:280 - Total input paths to process : 1 2015-03-03 15:06:39 WARN SchemaValidatorDriver:145 - * * * hadoopConfiguration: itemSet 2015-03-03 15:06:40 ERROR Executor:96 - Exception in task 0.0 in stage 0.0 (TID 0) java.io.IOException: Missing configuration value for multilinejsoninputformat.member at com.alexholmes.json.mapreduce.MultiLineJsonInputFormat.createRecordReader(MultiLineJsonInputFormat.java:30) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:133) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) at org.apache.spark.rdd.RDD.iterator(RDD.scala:245) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) at org.apache.spark.rdd.RDD.iterator(RDD.scala:245) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Since in the exception it refers to a missing configuration "multilinejsoninputformat.member", I think it is about the following line: ssc.ssc().sc().hadoopConfiguration().set("multilinejsoninputformat.member ", "itemSet"); And this is why I also log the value of it, and as you can see above, just before it gives the exception in the recovery process, it shows that "multilinejsoninputformat.member" is set to "itemSet". But somehow it is not found during the recovery. This exception happens only when it tries to recover from a previously interrupted run. I've also tried moving the above line into the "createContext" method, but still had the same exception. Why is that? And how can I work around it? -- Emre Sevinç http://www.bigindustries.be/