I've also tried the following: Configuration hadoopConfiguration = new Configuration(); hadoopConfiguration.set("multilinejsoninputformat.member", "itemSet");
JavaStreamingContext ssc = JavaStreamingContext.getOrCreate(checkpointDirectory, hadoopConfiguration, factory, false); but I still get the same exception. Why doesn't getOrCreate ignore that Hadoop configuration part (which normally works, e.g. when not recovering)? -- Emre On Tue, Mar 3, 2015 at 3:36 PM, Emre Sevinc <emre.sev...@gmail.com> wrote: > Hello, > > I have a Spark Streaming application (that uses Spark 1.2.1) that listens > to an input directory, and when new JSON files are copied to that directory > processes them, and writes them to an output directory. > > It uses a 3rd party library to process the multi-line JSON files ( > https://github.com/alexholmes/json-mapreduce). You can see the relevant > part of the streaming application at: > > https://gist.github.com/emres/ec18ee264e4eb0dd8f1a > > When I run this application locally, it works perfectly fine. But then I > wanted to test whether it could recover from failure, e.g. if I stopped it > right in the middle of processing some files. I started the streaming > application, copied 100 files to the input directory, and hit Ctrl+C when > it has alread processed about 50 files: > > ... > 2015-03-03 15:06:20 INFO FileInputFormat:280 - Total input paths to > process : 1 > 2015-03-03 15:06:20 INFO FileInputFormat:280 - Total input paths to > process : 1 > 2015-03-03 15:06:20 INFO FileInputFormat:280 - Total input paths to > process : 1 > [Stage > 0:==========================================================================================================================> > (65 + 4) / 100] > ^C > > Then I started the application again, expecting that it could recover from > the checkpoint. For a while it started to read files again and then gave an > exception: > > ... > 2015-03-03 15:06:39 INFO FileInputFormat:280 - Total input paths to > process : 1 > 2015-03-03 15:06:39 INFO FileInputFormat:280 - Total input paths to > process : 1 > 2015-03-03 15:06:39 WARN SchemaValidatorDriver:145 - > * * * hadoopConfiguration: itemSet > 2015-03-03 15:06:40 ERROR Executor:96 - Exception in task 0.0 in stage 0.0 > (TID 0) > java.io.IOException: Missing configuration value for > multilinejsoninputformat.member > at > com.alexholmes.json.mapreduce.MultiLineJsonInputFormat.createRecordReader(MultiLineJsonInputFormat.java:30) > at > org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:133) > at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107) > at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) > at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) > at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:245) > at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:245) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > at org.apache.spark.scheduler.Task.run(Task.scala:56) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > Since in the exception it refers to a missing configuration > "multilinejsoninputformat.member", I think it is about the following line: > > ssc.ssc().sc().hadoopConfiguration().set(" > multilinejsoninputformat.member", "itemSet"); > > And this is why I also log the value of it, and as you can see above, just > before it gives the exception in the recovery process, it shows that > "multilinejsoninputformat.member" > is set to "itemSet". But somehow it is not found during the recovery. > This exception happens only when it tries to recover from a previously > interrupted run. > > I've also tried moving the above line into the "createContext" method, but > still had the same exception. > > Why is that? > > And how can I work around it? > > -- > Emre Sevinç > http://www.bigindustries.be/ > > -- Emre Sevinc