Re: Issue of running partitioned loading (RDD) in Spark External Datasource on Mesos

2015-04-30 Thread Yang Lei
I finally isolated the issue to be related to the ActorSystem I reuse from
SparkEnv.get.actorSystem. This ActorSystem will contain the configuration
defined in my application jar's reference.conf in both local cluster case,
and in the case I use it directly in an extension to BaseRelation's buildScan
method. However if used in my RDD which is returned in the buildScan, it
loses the configuration.

I solve / bypass the problem by checking if my configuration exists in the
SparkEnv.get.actorSystem(settings.config) .If it does not exist, I will
create a new ActorSystem using my class's classLoader to force config
reading from my application jar:

val classLoader = this.getClass.getClassLoader

val myconfig = ConfigFactory.load(classLoader)// force config
reading from my classloader

ActorSystem("somename..",myconfig,classLoader)


I wonder if this different behavior of SparkEnv.get.actorSystem is
working-as-designed, or something is missing in executor setup for this
custom RDD driven execution case.


Thanks.


Yang


Re: Issue of running partitioned loading (RDD) in Spark External Datasource on Mesos

2015-04-24 Thread Yang Lei
The configure is in the jar I passed in.  And if I do not create my own RDD for 
partitioned loading, everything is fine, in which case the task is run in 
executor right? So it seems some special call path before triggering my RDD 
compute makes the configure 'lost'. 

I will try to see if I can debug further. But any  insight in this special call 
path will be appreciated. 

Yang

Sent from my iPhone

> On Apr 24, 2015, at 8:14 PM, Reynold Xin  wrote:
> 
> This looks like a specific Spray configuration issue (or how Spray reads 
> config files). Maybe Spray is reading some local config file that doesn't 
> exist on your executors? 
> 
> You might need to email the Spray list.
> 
> 
>> On Fri, Apr 24, 2015 at 2:38 PM, Yang Lei  wrote:
>> forward to dev.
>> 
>> On Mon, Apr 20, 2015 at 10:46 AM, Yang Lei  wrote:
>> 
>> > I implemented two kinds of DataSource, one load data during buildScan,
>> > the other returning my RDD class with partition information for future
>> > loading.
>> >
>> > My RDD's compute gets actorSystem from  SparkEnv.get.actorSystem, then
>> > use Spray to interact with a HTTP endpoint, which is the same flow as
>> > loading data in buildScan.  All the Spray dependencies are included in a
>> > jar and passes to spark-submit using --jar.
>> >
>> > The Job is define in python.
>> >
>> > Both scenarios work testing locally using --master local[4]. For mesos,
>> > the not partitioned loading works too, but the partitioned loading hits the
>> > following exception.
>> >
>> > Traceback (most recent call last):
>> >
>> >   File "/root/spark-1.3.1-bin-hadoop2.4/../CloudantApp.py", line 78, in
>> > 
>> >
>> > for code in airportData.collect():
>> >
>> >   File "/root/spark-1.3.1-bin-hadoop2.4/python/pyspark/sql/dataframe.py",
>> > line 293, in collect
>> >
>> > port =
>> > self._sc._jvm.PythonRDD.collectAndServe(self._jdf.javaToPython().rdd())
>> >
>> >   File
>> > "/root/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
>> > line 538, in __call__
>> >
>> >   File
>> > "/root/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
>> > line 300, in get_return_value
>> >
>> > py4j.protocol.Py4JJavaError: An error occurred while calling
>> > o60.javaToPython.
>> >
>> > : org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> > 0 in stage 36.0 failed 4 times, most recent failure: Lost task 0.3 in stage
>> > 36.0 (TID 147, 198.11.207.72): com.typesafe.config.ConfigException$Missing:
>> > No configuration setting found for key 'spray'
>> >
>> > at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124)
>> >
>> > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:147)
>> >
>> > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159)
>> >
>> > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164)
>> >
>> > at com.typesafe.config.impl.SimpleConfig.getObject(SimpleConfig.java:218)
>> >
>> > at com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:224)
>> >
>> > at com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:33)
>> >
>> > at spray.can.HttpExt.(Http.scala:143)
>> >
>> > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> >
>> > at
>> > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>> >
>> > at
>> > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> >
>> > at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>> >
>> > at
>> > akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)
>> >
>> > at scala.util.Try$.apply(Try.scala:161)
>> >
>> > at
>> > akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)
>> >
>> > at akka.actor.ExtensionKey.createExtension(Extension.scala:153)
>> >
>> > at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:711)
>> >
>> > at akka.actor.ExtensionId$class.apply(Extension.scala:79)
>> >
>> > at akka.actor.ExtensionKey.apply(Extension.scala:149)
>> >
>> > at akka.io.IO$.apply(IO.scala:30)
>> >
>> > at spray.client.pipelining$.sendReceive(pipelining.scala:35)
>> >
>> > at
>> > com.cloudant.spark.common.JsonStoreDataAccess.getQueryResult(JsonStoreDataAccess.scala:118)
>> >
>> > at
>> > com.cloudant.spark.common.JsonStoreDataAccess.getIterator(JsonStoreDataAccess.scala:71)
>> >
>> > at com.cloudant.spark.common.JsonStoreRDD.compute(JsonStoreRDD.scala:86)
>> >
>> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>> >
>> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>> >
>> > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>> >
>> > at org.apache.spark.scheduler.Task.run(Task.scala:64)
>> >
>> > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>> >
>> > at
>> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> >

Re: Issue of running partitioned loading (RDD) in Spark External Datasource on Mesos

2015-04-24 Thread Reynold Xin
This looks like a specific Spray configuration issue (or how Spray reads
config files). Maybe Spray is reading some local config file that doesn't
exist on your executors?

You might need to email the Spray list.


On Fri, Apr 24, 2015 at 2:38 PM, Yang Lei  wrote:

> forward to dev.
>
> On Mon, Apr 20, 2015 at 10:46 AM, Yang Lei  wrote:
>
> > I implemented two kinds of DataSource, one load data during buildScan,
> > the other returning my RDD class with partition information for future
> > loading.
> >
> > My RDD's compute gets actorSystem from  SparkEnv.get.actorSystem, then
> > use Spray to interact with a HTTP endpoint, which is the same flow as
> > loading data in buildScan.  All the Spray dependencies are included in a
> > jar and passes to spark-submit using --jar.
> >
> > The Job is define in python.
> >
> > Both scenarios work testing locally using --master local[4]. For mesos,
> > the not partitioned loading works too, but the partitioned loading hits
> the
> > following exception.
> >
> > Traceback (most recent call last):
> >
> >   File "/root/spark-1.3.1-bin-hadoop2.4/../CloudantApp.py", line 78, in
> > 
> >
> > for code in airportData.collect():
> >
> >   File "/root/spark-1.3.1-bin-hadoop2.4/python/pyspark/sql/dataframe.py",
> > line 293, in collect
> >
> > port =
> > self._sc._jvm.PythonRDD.collectAndServe(self._jdf.javaToPython().rdd())
> >
> >   File
> >
> "/root/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
> > line 538, in __call__
> >
> >   File
> >
> "/root/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
> > line 300, in get_return_value
> >
> > py4j.protocol.Py4JJavaError: An error occurred while calling
> > o60.javaToPython.
> >
> > : org.apache.spark.SparkException: Job aborted due to stage failure: Task
> > 0 in stage 36.0 failed 4 times, most recent failure: Lost task 0.3 in
> stage
> > 36.0 (TID 147, 198.11.207.72):
> com.typesafe.config.ConfigException$Missing:
> > No configuration setting found for key 'spray'
> >
> > at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124)
> >
> > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:147)
> >
> > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159)
> >
> > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164)
> >
> > at com.typesafe.config.impl.SimpleConfig.getObject(SimpleConfig.java:218)
> >
> > at com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:224)
> >
> > at com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:33)
> >
> > at spray.can.HttpExt.(Http.scala:143)
> >
> > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> >
> > at
> >
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> >
> > at
> >
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> >
> > at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> >
> > at
> >
> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)
> >
> > at scala.util.Try$.apply(Try.scala:161)
> >
> > at
> >
> akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)
> >
> > at akka.actor.ExtensionKey.createExtension(Extension.scala:153)
> >
> > at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:711)
> >
> > at akka.actor.ExtensionId$class.apply(Extension.scala:79)
> >
> > at akka.actor.ExtensionKey.apply(Extension.scala:149)
> >
> > at akka.io.IO$.apply(IO.scala:30)
> >
> > at spray.client.pipelining$.sendReceive(pipelining.scala:35)
> >
> > at
> >
> com.cloudant.spark.common.JsonStoreDataAccess.getQueryResult(JsonStoreDataAccess.scala:118)
> >
> > at
> >
> com.cloudant.spark.common.JsonStoreDataAccess.getIterator(JsonStoreDataAccess.scala:71)
> >
> > at com.cloudant.spark.common.JsonStoreRDD.compute(JsonStoreRDD.scala:86)
> >
> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> >
> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> >
> > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> >
> > at org.apache.spark.scheduler.Task.run(Task.scala:64)
> >
> > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> >
> > at
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> >
> > at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >
> > at java.lang.Thread.run(Thread.java:745)
> >
> > Is this due to some kind of classpath setup issue on the executor for the
> > external jar for handing RDD?
> >
> > Thanks in advance for any suggestions on how to resolve this.
> >
> > Yang
> >
>


Re: Issue of running partitioned loading (RDD) in Spark External Datasource on Mesos

2015-04-24 Thread Yang Lei
forward to dev.

On Mon, Apr 20, 2015 at 10:46 AM, Yang Lei  wrote:

> I implemented two kinds of DataSource, one load data during buildScan,
> the other returning my RDD class with partition information for future
> loading.
>
> My RDD's compute gets actorSystem from  SparkEnv.get.actorSystem, then
> use Spray to interact with a HTTP endpoint, which is the same flow as
> loading data in buildScan.  All the Spray dependencies are included in a
> jar and passes to spark-submit using --jar.
>
> The Job is define in python.
>
> Both scenarios work testing locally using --master local[4]. For mesos,
> the not partitioned loading works too, but the partitioned loading hits the
> following exception.
>
> Traceback (most recent call last):
>
>   File "/root/spark-1.3.1-bin-hadoop2.4/../CloudantApp.py", line 78, in
> 
>
> for code in airportData.collect():
>
>   File "/root/spark-1.3.1-bin-hadoop2.4/python/pyspark/sql/dataframe.py",
> line 293, in collect
>
> port =
> self._sc._jvm.PythonRDD.collectAndServe(self._jdf.javaToPython().rdd())
>
>   File
> "/root/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
> line 538, in __call__
>
>   File
> "/root/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
> line 300, in get_return_value
>
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o60.javaToPython.
>
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 0 in stage 36.0 failed 4 times, most recent failure: Lost task 0.3 in stage
> 36.0 (TID 147, 198.11.207.72): com.typesafe.config.ConfigException$Missing:
> No configuration setting found for key 'spray'
>
> at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124)
>
> at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:147)
>
> at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159)
>
> at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164)
>
> at com.typesafe.config.impl.SimpleConfig.getObject(SimpleConfig.java:218)
>
> at com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:224)
>
> at com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:33)
>
> at spray.can.HttpExt.(Http.scala:143)
>
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>
> at
> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)
>
> at scala.util.Try$.apply(Try.scala:161)
>
> at
> akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)
>
> at akka.actor.ExtensionKey.createExtension(Extension.scala:153)
>
> at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:711)
>
> at akka.actor.ExtensionId$class.apply(Extension.scala:79)
>
> at akka.actor.ExtensionKey.apply(Extension.scala:149)
>
> at akka.io.IO$.apply(IO.scala:30)
>
> at spray.client.pipelining$.sendReceive(pipelining.scala:35)
>
> at
> com.cloudant.spark.common.JsonStoreDataAccess.getQueryResult(JsonStoreDataAccess.scala:118)
>
> at
> com.cloudant.spark.common.JsonStoreDataAccess.getIterator(JsonStoreDataAccess.scala:71)
>
> at com.cloudant.spark.common.JsonStoreRDD.compute(JsonStoreRDD.scala:86)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
>
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
> at java.lang.Thread.run(Thread.java:745)
>
> Is this due to some kind of classpath setup issue on the executor for the
> external jar for handing RDD?
>
> Thanks in advance for any suggestions on how to resolve this.
>
> Yang
>