Re: Issue of running partitioned loading (RDD) in Spark External Datasource on Mesos
I finally isolated the issue to be related to the ActorSystem I reuse from SparkEnv.get.actorSystem. This ActorSystem will contain the configuration defined in my application jar's reference.conf in both local cluster case, and in the case I use it directly in an extension to BaseRelation's buildScan method. However if used in my RDD which is returned in the buildScan, it loses the configuration. I solve / bypass the problem by checking if my configuration exists in the SparkEnv.get.actorSystem(settings.config) .If it does not exist, I will create a new ActorSystem using my class's classLoader to force config reading from my application jar: val classLoader = this.getClass.getClassLoader val myconfig = ConfigFactory.load(classLoader)// force config reading from my classloader ActorSystem("somename..",myconfig,classLoader) I wonder if this different behavior of SparkEnv.get.actorSystem is working-as-designed, or something is missing in executor setup for this custom RDD driven execution case. Thanks. Yang
Re: Issue of running partitioned loading (RDD) in Spark External Datasource on Mesos
The configure is in the jar I passed in. And if I do not create my own RDD for partitioned loading, everything is fine, in which case the task is run in executor right? So it seems some special call path before triggering my RDD compute makes the configure 'lost'. I will try to see if I can debug further. But any insight in this special call path will be appreciated. Yang Sent from my iPhone > On Apr 24, 2015, at 8:14 PM, Reynold Xin wrote: > > This looks like a specific Spray configuration issue (or how Spray reads > config files). Maybe Spray is reading some local config file that doesn't > exist on your executors? > > You might need to email the Spray list. > > >> On Fri, Apr 24, 2015 at 2:38 PM, Yang Lei wrote: >> forward to dev. >> >> On Mon, Apr 20, 2015 at 10:46 AM, Yang Lei wrote: >> >> > I implemented two kinds of DataSource, one load data during buildScan, >> > the other returning my RDD class with partition information for future >> > loading. >> > >> > My RDD's compute gets actorSystem from SparkEnv.get.actorSystem, then >> > use Spray to interact with a HTTP endpoint, which is the same flow as >> > loading data in buildScan. All the Spray dependencies are included in a >> > jar and passes to spark-submit using --jar. >> > >> > The Job is define in python. >> > >> > Both scenarios work testing locally using --master local[4]. For mesos, >> > the not partitioned loading works too, but the partitioned loading hits the >> > following exception. >> > >> > Traceback (most recent call last): >> > >> > File "/root/spark-1.3.1-bin-hadoop2.4/../CloudantApp.py", line 78, in >> > >> > >> > for code in airportData.collect(): >> > >> > File "/root/spark-1.3.1-bin-hadoop2.4/python/pyspark/sql/dataframe.py", >> > line 293, in collect >> > >> > port = >> > self._sc._jvm.PythonRDD.collectAndServe(self._jdf.javaToPython().rdd()) >> > >> > File >> > "/root/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", >> > line 538, in __call__ >> > >> > File >> > "/root/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", >> > line 300, in get_return_value >> > >> > py4j.protocol.Py4JJavaError: An error occurred while calling >> > o60.javaToPython. >> > >> > : org.apache.spark.SparkException: Job aborted due to stage failure: Task >> > 0 in stage 36.0 failed 4 times, most recent failure: Lost task 0.3 in stage >> > 36.0 (TID 147, 198.11.207.72): com.typesafe.config.ConfigException$Missing: >> > No configuration setting found for key 'spray' >> > >> > at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124) >> > >> > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:147) >> > >> > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159) >> > >> > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164) >> > >> > at com.typesafe.config.impl.SimpleConfig.getObject(SimpleConfig.java:218) >> > >> > at com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:224) >> > >> > at com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:33) >> > >> > at spray.can.HttpExt.(Http.scala:143) >> > >> > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) >> > >> > at >> > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) >> > >> > at >> > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >> > >> > at java.lang.reflect.Constructor.newInstance(Constructor.java:526) >> > >> > at >> > akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78) >> > >> > at scala.util.Try$.apply(Try.scala:161) >> > >> > at >> > akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73) >> > >> > at akka.actor.ExtensionKey.createExtension(Extension.scala:153) >> > >> > at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:711) >> > >> > at akka.actor.ExtensionId$class.apply(Extension.scala:79) >> > >> > at akka.actor.ExtensionKey.apply(Extension.scala:149) >> > >> > at akka.io.IO$.apply(IO.scala:30) >> > >> > at spray.client.pipelining$.sendReceive(pipelining.scala:35) >> > >> > at >> > com.cloudant.spark.common.JsonStoreDataAccess.getQueryResult(JsonStoreDataAccess.scala:118) >> > >> > at >> > com.cloudant.spark.common.JsonStoreDataAccess.getIterator(JsonStoreDataAccess.scala:71) >> > >> > at com.cloudant.spark.common.JsonStoreRDD.compute(JsonStoreRDD.scala:86) >> > >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >> > >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >> > >> > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) >> > >> > at org.apache.spark.scheduler.Task.run(Task.scala:64) >> > >> > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) >> > >> > at >> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> >
Re: Issue of running partitioned loading (RDD) in Spark External Datasource on Mesos
This looks like a specific Spray configuration issue (or how Spray reads config files). Maybe Spray is reading some local config file that doesn't exist on your executors? You might need to email the Spray list. On Fri, Apr 24, 2015 at 2:38 PM, Yang Lei wrote: > forward to dev. > > On Mon, Apr 20, 2015 at 10:46 AM, Yang Lei wrote: > > > I implemented two kinds of DataSource, one load data during buildScan, > > the other returning my RDD class with partition information for future > > loading. > > > > My RDD's compute gets actorSystem from SparkEnv.get.actorSystem, then > > use Spray to interact with a HTTP endpoint, which is the same flow as > > loading data in buildScan. All the Spray dependencies are included in a > > jar and passes to spark-submit using --jar. > > > > The Job is define in python. > > > > Both scenarios work testing locally using --master local[4]. For mesos, > > the not partitioned loading works too, but the partitioned loading hits > the > > following exception. > > > > Traceback (most recent call last): > > > > File "/root/spark-1.3.1-bin-hadoop2.4/../CloudantApp.py", line 78, in > > > > > > for code in airportData.collect(): > > > > File "/root/spark-1.3.1-bin-hadoop2.4/python/pyspark/sql/dataframe.py", > > line 293, in collect > > > > port = > > self._sc._jvm.PythonRDD.collectAndServe(self._jdf.javaToPython().rdd()) > > > > File > > > "/root/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", > > line 538, in __call__ > > > > File > > > "/root/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", > > line 300, in get_return_value > > > > py4j.protocol.Py4JJavaError: An error occurred while calling > > o60.javaToPython. > > > > : org.apache.spark.SparkException: Job aborted due to stage failure: Task > > 0 in stage 36.0 failed 4 times, most recent failure: Lost task 0.3 in > stage > > 36.0 (TID 147, 198.11.207.72): > com.typesafe.config.ConfigException$Missing: > > No configuration setting found for key 'spray' > > > > at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124) > > > > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:147) > > > > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159) > > > > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164) > > > > at com.typesafe.config.impl.SimpleConfig.getObject(SimpleConfig.java:218) > > > > at com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:224) > > > > at com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:33) > > > > at spray.can.HttpExt.(Http.scala:143) > > > > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > > > > at > > > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) > > > > at > > > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > > > > at java.lang.reflect.Constructor.newInstance(Constructor.java:526) > > > > at > > > akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78) > > > > at scala.util.Try$.apply(Try.scala:161) > > > > at > > > akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73) > > > > at akka.actor.ExtensionKey.createExtension(Extension.scala:153) > > > > at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:711) > > > > at akka.actor.ExtensionId$class.apply(Extension.scala:79) > > > > at akka.actor.ExtensionKey.apply(Extension.scala:149) > > > > at akka.io.IO$.apply(IO.scala:30) > > > > at spray.client.pipelining$.sendReceive(pipelining.scala:35) > > > > at > > > com.cloudant.spark.common.JsonStoreDataAccess.getQueryResult(JsonStoreDataAccess.scala:118) > > > > at > > > com.cloudant.spark.common.JsonStoreDataAccess.getIterator(JsonStoreDataAccess.scala:71) > > > > at com.cloudant.spark.common.JsonStoreRDD.compute(JsonStoreRDD.scala:86) > > > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > > > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > > > > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > > > > at org.apache.spark.scheduler.Task.run(Task.scala:64) > > > > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) > > > > at > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > > > at > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > > > > at java.lang.Thread.run(Thread.java:745) > > > > Is this due to some kind of classpath setup issue on the executor for the > > external jar for handing RDD? > > > > Thanks in advance for any suggestions on how to resolve this. > > > > Yang > > >
Re: Issue of running partitioned loading (RDD) in Spark External Datasource on Mesos
forward to dev. On Mon, Apr 20, 2015 at 10:46 AM, Yang Lei wrote: > I implemented two kinds of DataSource, one load data during buildScan, > the other returning my RDD class with partition information for future > loading. > > My RDD's compute gets actorSystem from SparkEnv.get.actorSystem, then > use Spray to interact with a HTTP endpoint, which is the same flow as > loading data in buildScan. All the Spray dependencies are included in a > jar and passes to spark-submit using --jar. > > The Job is define in python. > > Both scenarios work testing locally using --master local[4]. For mesos, > the not partitioned loading works too, but the partitioned loading hits the > following exception. > > Traceback (most recent call last): > > File "/root/spark-1.3.1-bin-hadoop2.4/../CloudantApp.py", line 78, in > > > for code in airportData.collect(): > > File "/root/spark-1.3.1-bin-hadoop2.4/python/pyspark/sql/dataframe.py", > line 293, in collect > > port = > self._sc._jvm.PythonRDD.collectAndServe(self._jdf.javaToPython().rdd()) > > File > "/root/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", > line 538, in __call__ > > File > "/root/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", > line 300, in get_return_value > > py4j.protocol.Py4JJavaError: An error occurred while calling > o60.javaToPython. > > : org.apache.spark.SparkException: Job aborted due to stage failure: Task > 0 in stage 36.0 failed 4 times, most recent failure: Lost task 0.3 in stage > 36.0 (TID 147, 198.11.207.72): com.typesafe.config.ConfigException$Missing: > No configuration setting found for key 'spray' > > at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124) > > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:147) > > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159) > > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164) > > at com.typesafe.config.impl.SimpleConfig.getObject(SimpleConfig.java:218) > > at com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:224) > > at com.typesafe.config.impl.SimpleConfig.getConfig(SimpleConfig.java:33) > > at spray.can.HttpExt.(Http.scala:143) > > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) > > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > > at java.lang.reflect.Constructor.newInstance(Constructor.java:526) > > at > akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78) > > at scala.util.Try$.apply(Try.scala:161) > > at > akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73) > > at akka.actor.ExtensionKey.createExtension(Extension.scala:153) > > at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:711) > > at akka.actor.ExtensionId$class.apply(Extension.scala:79) > > at akka.actor.ExtensionKey.apply(Extension.scala:149) > > at akka.io.IO$.apply(IO.scala:30) > > at spray.client.pipelining$.sendReceive(pipelining.scala:35) > > at > com.cloudant.spark.common.JsonStoreDataAccess.getQueryResult(JsonStoreDataAccess.scala:118) > > at > com.cloudant.spark.common.JsonStoreDataAccess.getIterator(JsonStoreDataAccess.scala:71) > > at com.cloudant.spark.common.JsonStoreRDD.compute(JsonStoreRDD.scala:86) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > > at org.apache.spark.scheduler.Task.run(Task.scala:64) > > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > > at java.lang.Thread.run(Thread.java:745) > > Is this due to some kind of classpath setup issue on the executor for the > external jar for handing RDD? > > Thanks in advance for any suggestions on how to resolve this. > > Yang >