Hello,

I have a Spark Streaming application (that uses Spark 1.2.1) that listens
to an input directory, and when new JSON files are copied to that directory
processes them, and writes them to an output directory.

It uses a 3rd party library to process the multi-line JSON files (
https://github.com/alexholmes/json-mapreduce). You can see the relevant
part of the streaming application at:

  https://gist.github.com/emres/ec18ee264e4eb0dd8f1a

When I run this application locally, it works perfectly fine. But then I
wanted to test whether it could recover from failure, e.g. if I stopped it
right in the middle of processing some files. I started the streaming
application, copied 100 files to the input directory, and hit Ctrl+C when
it has alread processed about 50 files:

...
2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
process : 1
[Stage
0:==========================================================================================================================>
(65 + 4) / 100]
^C

Then I started the application again, expecting that it could recover from
the checkpoint. For a while it started to read files again and then gave an
exception:

...
2015-03-03 15:06:39 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-03-03 15:06:39 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-03-03 15:06:39 WARN  SchemaValidatorDriver:145 -
 * * * hadoopConfiguration: itemSet
2015-03-03 15:06:40 ERROR Executor:96 - Exception in task 0.0 in stage 0.0
(TID 0)
java.io.IOException: Missing configuration value for
multilinejsoninputformat.member
    at
com.alexholmes.json.mapreduce.MultiLineJsonInputFormat.createRecordReader(MultiLineJsonInputFormat.java:30)
    at
org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:133)
    at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107)
    at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
    at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:56)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Since in the exception it refers to a missing configuration
"multilinejsoninputformat.member", I think it is about the following line:

   ssc.ssc().sc().hadoopConfiguration().set("multilinejsoninputformat.member
", "itemSet");

And this is why I also log the value of it, and as you can see above, just
before it gives the exception in the recovery process, it shows that
"multilinejsoninputformat.member"
is set to "itemSet". But somehow it is not found during the recovery. This
exception happens only when it tries to recover from a previously
interrupted run.

I've also tried moving the above line into the "createContext" method, but
still had the same exception.

Why is that?

And how can I work around it?

-- 
Emre Sevinç
http://www.bigindustries.be/

Reply via email to