I'm adding this 3rd party library to my Maven pom.xml file so that it's
embedded into the JAR I send to spark-submit:

  <dependency>
      <groupId>json-mapreduce</groupId>
      <artifactId>json-mapreduce</artifactId>
      <version>1.0-SNAPSHOT</version>
      <exclusions>
        <exclusion>
          <groupId>javax.servlet</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <groupId>commons-io</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <groupId>commons-lang</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-common</artifactId>
        </exclusion>
      </exclusions>
    </dependency>


Then I build my über JAR, and then I run my Spark Streaming application via
the command line:

 spark-submit --class com.example.schemavalidator.SchemaValidatorDriver
--master local[4] --deploy-mode client target/myapp-1.0-SNAPSHOT.jar

--
Emre Sevinç


On Wed, Mar 4, 2015 at 11:19 AM, Tathagata Das <t...@databricks.com> wrote:

> That could be a corner case bug. How do you add the 3rd party library to
> the class path of the driver? Through spark-submit? Could you give the
> command you used?
>
> TD
>
> On Wed, Mar 4, 2015 at 12:42 AM, Emre Sevinc <emre.sev...@gmail.com>
> wrote:
>
>> I've also tried the following:
>>
>>     Configuration hadoopConfiguration = new Configuration();
>>     hadoopConfiguration.set("multilinejsoninputformat.member", "itemSet");
>>
>>     JavaStreamingContext ssc =
>> JavaStreamingContext.getOrCreate(checkpointDirectory, hadoopConfiguration,
>> factory, false);
>>
>>
>> but I still get the same exception.
>>
>> Why doesn't getOrCreate ignore that Hadoop configuration part (which
>> normally works, e.g. when not recovering)?
>>
>> --
>> Emre
>>
>>
>> On Tue, Mar 3, 2015 at 3:36 PM, Emre Sevinc <emre.sev...@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> I have a Spark Streaming application (that uses Spark 1.2.1) that
>>> listens to an input directory, and when new JSON files are copied to that
>>> directory processes them, and writes them to an output directory.
>>>
>>> It uses a 3rd party library to process the multi-line JSON files (
>>> https://github.com/alexholmes/json-mapreduce). You can see the relevant
>>> part of the streaming application at:
>>>
>>>   https://gist.github.com/emres/ec18ee264e4eb0dd8f1a
>>>
>>> When I run this application locally, it works perfectly fine. But then I
>>> wanted to test whether it could recover from failure, e.g. if I stopped it
>>> right in the middle of processing some files. I started the streaming
>>> application, copied 100 files to the input directory, and hit Ctrl+C when
>>> it has alread processed about 50 files:
>>>
>>> ...
>>> 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
>>> process : 1
>>> 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
>>> process : 1
>>> 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
>>> process : 1
>>> [Stage
>>> 0:==========================================================================================================================>
>>> (65 + 4) / 100]
>>> ^C
>>>
>>> Then I started the application again, expecting that it could recover
>>> from the checkpoint. For a while it started to read files again and then
>>> gave an exception:
>>>
>>> ...
>>> 2015-03-03 15:06:39 INFO  FileInputFormat:280 - Total input paths to
>>> process : 1
>>> 2015-03-03 15:06:39 INFO  FileInputFormat:280 - Total input paths to
>>> process : 1
>>> 2015-03-03 15:06:39 WARN  SchemaValidatorDriver:145 -
>>>  * * * hadoopConfiguration: itemSet
>>> 2015-03-03 15:06:40 ERROR Executor:96 - Exception in task 0.0 in stage
>>> 0.0 (TID 0)
>>> java.io.IOException: Missing configuration value for
>>> multilinejsoninputformat.member
>>>     at
>>> com.alexholmes.json.mapreduce.MultiLineJsonInputFormat.createRecordReader(MultiLineJsonInputFormat.java:30)
>>>     at
>>> org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:133)
>>>     at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107)
>>>     at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69)
>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>>>     at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>>>     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>>     at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
>>>     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>>     at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
>>>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>>     at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>>     at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
>>>     at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>     at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>     at java.lang.Thread.run(Thread.java:745)
>>>
>>> Since in the exception it refers to a missing configuration
>>> "multilinejsoninputformat.member", I think it is about the following line:
>>>
>>>    ssc.ssc().sc().hadoopConfiguration().set("
>>> multilinejsoninputformat.member", "itemSet");
>>>
>>> And this is why I also log the value of it, and as you can see above,
>>> just before it gives the exception in the recovery process, it shows that 
>>> "multilinejsoninputformat.member"
>>> is set to "itemSet". But somehow it is not found during the recovery.
>>> This exception happens only when it tries to recover from a previously
>>> interrupted run.
>>>
>>> I've also tried moving the above line into the "createContext" method,
>>> but still had the same exception.
>>>
>>> Why is that?
>>>
>>> And how can I work around it?
>>>
>>> --
>>> Emre Sevinç
>>> http://www.bigindustries.be/
>>>
>>>
>>
>>
>> --
>> Emre Sevinc
>>
>
>


-- 
Emre Sevinc

Reply via email to