I'm adding this 3rd party library to my Maven pom.xml file so that it's embedded into the JAR I send to spark-submit:
<dependency> <groupId>json-mapreduce</groupId> <artifactId>json-mapreduce</artifactId> <version>1.0-SNAPSHOT</version> <exclusions> <exclusion> <groupId>javax.servlet</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>commons-io</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>commons-lang</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> </exclusion> </exclusions> </dependency> Then I build my über JAR, and then I run my Spark Streaming application via the command line: spark-submit --class com.example.schemavalidator.SchemaValidatorDriver --master local[4] --deploy-mode client target/myapp-1.0-SNAPSHOT.jar -- Emre Sevinç On Wed, Mar 4, 2015 at 11:19 AM, Tathagata Das <t...@databricks.com> wrote: > That could be a corner case bug. How do you add the 3rd party library to > the class path of the driver? Through spark-submit? Could you give the > command you used? > > TD > > On Wed, Mar 4, 2015 at 12:42 AM, Emre Sevinc <emre.sev...@gmail.com> > wrote: > >> I've also tried the following: >> >> Configuration hadoopConfiguration = new Configuration(); >> hadoopConfiguration.set("multilinejsoninputformat.member", "itemSet"); >> >> JavaStreamingContext ssc = >> JavaStreamingContext.getOrCreate(checkpointDirectory, hadoopConfiguration, >> factory, false); >> >> >> but I still get the same exception. >> >> Why doesn't getOrCreate ignore that Hadoop configuration part (which >> normally works, e.g. when not recovering)? >> >> -- >> Emre >> >> >> On Tue, Mar 3, 2015 at 3:36 PM, Emre Sevinc <emre.sev...@gmail.com> >> wrote: >> >>> Hello, >>> >>> I have a Spark Streaming application (that uses Spark 1.2.1) that >>> listens to an input directory, and when new JSON files are copied to that >>> directory processes them, and writes them to an output directory. >>> >>> It uses a 3rd party library to process the multi-line JSON files ( >>> https://github.com/alexholmes/json-mapreduce). You can see the relevant >>> part of the streaming application at: >>> >>> https://gist.github.com/emres/ec18ee264e4eb0dd8f1a >>> >>> When I run this application locally, it works perfectly fine. But then I >>> wanted to test whether it could recover from failure, e.g. if I stopped it >>> right in the middle of processing some files. I started the streaming >>> application, copied 100 files to the input directory, and hit Ctrl+C when >>> it has alread processed about 50 files: >>> >>> ... >>> 2015-03-03 15:06:20 INFO FileInputFormat:280 - Total input paths to >>> process : 1 >>> 2015-03-03 15:06:20 INFO FileInputFormat:280 - Total input paths to >>> process : 1 >>> 2015-03-03 15:06:20 INFO FileInputFormat:280 - Total input paths to >>> process : 1 >>> [Stage >>> 0:==========================================================================================================================> >>> (65 + 4) / 100] >>> ^C >>> >>> Then I started the application again, expecting that it could recover >>> from the checkpoint. For a while it started to read files again and then >>> gave an exception: >>> >>> ... >>> 2015-03-03 15:06:39 INFO FileInputFormat:280 - Total input paths to >>> process : 1 >>> 2015-03-03 15:06:39 INFO FileInputFormat:280 - Total input paths to >>> process : 1 >>> 2015-03-03 15:06:39 WARN SchemaValidatorDriver:145 - >>> * * * hadoopConfiguration: itemSet >>> 2015-03-03 15:06:40 ERROR Executor:96 - Exception in task 0.0 in stage >>> 0.0 (TID 0) >>> java.io.IOException: Missing configuration value for >>> multilinejsoninputformat.member >>> at >>> com.alexholmes.json.mapreduce.MultiLineJsonInputFormat.createRecordReader(MultiLineJsonInputFormat.java:30) >>> at >>> org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:133) >>> at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107) >>> at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) >>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) >>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) >>> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:245) >>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) >>> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:245) >>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) >>> at org.apache.spark.scheduler.Task.run(Task.scala:56) >>> at >>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> Since in the exception it refers to a missing configuration >>> "multilinejsoninputformat.member", I think it is about the following line: >>> >>> ssc.ssc().sc().hadoopConfiguration().set(" >>> multilinejsoninputformat.member", "itemSet"); >>> >>> And this is why I also log the value of it, and as you can see above, >>> just before it gives the exception in the recovery process, it shows that >>> "multilinejsoninputformat.member" >>> is set to "itemSet". But somehow it is not found during the recovery. >>> This exception happens only when it tries to recover from a previously >>> interrupted run. >>> >>> I've also tried moving the above line into the "createContext" method, >>> but still had the same exception. >>> >>> Why is that? >>> >>> And how can I work around it? >>> >>> -- >>> Emre Sevinç >>> http://www.bigindustries.be/ >>> >>> >> >> >> -- >> Emre Sevinc >> > > -- Emre Sevinc