Re: Distribute jar dependencies via sc.AddJar(fileName)

2014-05-16 Thread DB Tsai
After reading the spark code more carefully, spark does
`Thread.currentThread().setContextClassLoader` to the custom
classloader.

However, the classes have to be used via reflection with this approach. See,

http://stackoverflow.com/questions/7452411/thread-currentthread-setcontextclassloader-without-using-reflection

Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Thu, May 15, 2014 at 4:26 PM, DB Tsai  wrote:
> Hi guys,
>
> I think it maybe a bug in Spark. I wrote some code to demonstrate the bug.
>
> Example 1) This is how Spark adds jars. Basically, add jars to
> cutomURLClassLoader.
>
> https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling1.java
>
> It doesn't work for two reasons. a) We don't pass the
> customURLClassLoader to task, so it's only available in the
> Executor.scala.  b) Even we do so, we need to get the class by
> loader.loadClass("Class Name").newInstance(), and get the Method by
> getDeclaredMethod to run it.
>
>
> Example 2) It works by getting the class using loadClass API, and then
> get and run the Method by getDeclaredMethod. Since we don't know which
> classes users will use, it's not a solution.
>
> https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling2.java
>
>
> Example 3) Add jars to systemClassLoader and have them accessible in
> JVM. Users can use the classes directly.
>
> https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling3.java
>
> I'm now porting example 3) to Spark, and will let you know if it works.
>
> Thanks.
>
> Sincerely,
>
> DB Tsai
> ---
> My Blog: https://www.dbtsai.com
> LinkedIn: https://www.linkedin.com/in/dbtsai
>
>
> On Thu, May 15, 2014 at 12:03 PM, DB Tsai  wrote:
>> Hi Xiangrui,
>>
>> We're still using Spark 0.9 branch, and our job is submitted by
>>
>> ./bin/spark-class org.apache.spark.deploy.yarn.Client \
>>   --jar  \
>>   --class  \
>>   --args  \
>>   --num-workers  \
>>   --master-class 
>>   --master-memory  \
>>   --worker-memory  \
>>   --addJars 
>>
>>
>> Based on my understanding of the code in yarn-standalone mode, the jar
>> distributing from local machine to application master is through distributed
>> cache (using hadoop yarn-client api). From application master to executors,
>> it's through http server. I maybe wrong, but if you look at the code in
>> SparkContext addJar method, you can see the jar is added to http server in
>> yarn-standalone mode.
>>
>> if (SparkHadoopUtil.get.isYarnMode() && master ==
>> "yarn-standalone") {
>>   // In order for this to work in yarn standalone mode the user
>> must specify the
>>   // --addjars option to the client to upload the file into the
>> distributed cache
>>   // of the AM to make it show up in the current working
>> directory.
>>   val fileName = new Path(uri.getPath).getName()
>>   try {
>> env.httpFileServer.addJar(new File(fileName))
>>   } catch {
>>
>> Those jars will be fetched in Executor from http server and added to
>> classloader of "Executor" class, see
>>
>>   private def updateDependencies(newFiles: HashMap[String, Long], newJars:
>> HashMap[String, Long]) {
>> synchronized {
>>   // Fetch missing dependencies
>>   for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name,
>> -1L) < timestamp) {
>> logInfo("Fetching " + name + " with timestamp " + timestamp)
>> Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf)
>> currentFiles(name) = timestamp
>>   }
>>   for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L)
>> < timestamp) {
>> logInfo("Fetching " + name + " with timestamp " + timestamp)
>> Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf)
>> currentJars(name) = timestamp
>> // Add it to our class loader
>> val localName = name.split("/").last
>> val url = new File(SparkFiles.getRootDirectory,
>> localName).toURI.toURL
>>
>> if (!urlClassLoader.getURLs.contains(url)) {
>>   urlClassLoader.addURL(url)
>> }
>>   }
>>
>>
>> The problem seems to be that jars are added to the classloader of "Executor"
>> classes, and they are not accessible in Task.scala.
>>
>> I verified this by trying to load our custom classes in Executor.scala, and
>> it works. But if I tried to load those classes in Task.scala, I'll get
>> classNotFound exception.
>>
>> Thanks.
>>
>>
>>
>>
>>
>> Sincerely,
>>
>> DB Tsai
>> ---
>> My Blog: https://www.dbtsai.com
>> LinkedIn: https://www.linkedin.com/in/dbtsai
>>
>>
>> On Wed, May 14, 2014 at 6:04 PM, Xiangrui Meng  wrote:
>>>
>>> In SparkContext#addJar, for yarn-sta

Re: Distribute jar dependencies via sc.AddJar(fileName)

2014-05-16 Thread DB Tsai
Hi Xiangrui,

We're still using Spark 0.9 branch, and our job is submitted by

./bin/spark-class org.apache.spark.deploy.yarn.Client \
  --jar  \
  --class  \
  --args  \
  --num-workers  \
  --master-class 
  --master-memory  \
  --worker-memory  \
  --addJars 


Based on my understanding of the code in yarn-standalone mode, the jar
distributing from local machine to application master is through
distributed cache (using hadoop yarn-client api). From application master
to executors, it's through http server. I maybe wrong, but if you look at
the code in SparkContext addJar method, you can see the jar is added to
http server in yarn-standalone mode.

if (SparkHadoopUtil.get.isYarnMode() && master ==
"yarn-standalone") {
  // In order for this to work in yarn standalone mode the user
must specify the
  // --addjars option to the client to upload the file into the
distributed cache
  // of the AM to make it show up in the current working
directory.
  val fileName = new Path(uri.getPath).getName()
  try {
env.httpFileServer.addJar(new File(fileName))
  } catch {

Those jars will be fetched in Executor from http server and added to
classloader of "Executor" class, see

  private def updateDependencies(newFiles: HashMap[String, Long], newJars:
HashMap[String, Long]) {
synchronized {
  // Fetch missing dependencies
  for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name,
-1L) < timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf)
currentFiles(name) = timestamp
  }
  for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L)
< timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf)
currentJars(name) = timestamp
// Add it to our class loader
val localName = name.split("/").last
val url = new File(SparkFiles.getRootDirectory,
localName).toURI.toURL

if (!urlClassLoader.getURLs.contains(url)) {
  urlClassLoader.addURL(url)
}
  }


The problem seems to be that jars are added to the classloader of
"Executor" classes, and they are not accessible in Task.scala.

I verified this by trying to load our custom classes in Executor.scala, and
it works. But if I tried to load those classes in Task.scala, I'll get
classNotFound exception.

Thanks.





Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Wed, May 14, 2014 at 6:04 PM, Xiangrui Meng  wrote:

> In SparkContext#addJar, for yarn-standalone mode, the workers should
> get the jars from local distributed cache instead of fetching them
> from the http server. Could you send the command you used to submit
> the job? -Xiangrui
>
> On Wed, May 14, 2014 at 1:26 AM, DB Tsai  wrote:
> > Hi Xiangrui,
> >
> > I actually used `yarn-standalone`, sorry for misleading. I did debugging
> in
> > the last couple days, and everything up to updateDependency in
> > executor.scala works. I also checked the file size and md5sum in the
> > executors, and they are the same as the one in driver. Gonna do more
> testing
> > tomorrow.
> >
> > Thanks.
> >
> >
> > Sincerely,
> >
> > DB Tsai
> > ---
> > My Blog: https://www.dbtsai.com
> > LinkedIn: https://www.linkedin.com/in/dbtsai
> >
> >
> > On Tue, May 13, 2014 at 11:41 PM, Xiangrui Meng 
> wrote:
> >>
> >> I don't know whether this would fix the problem. In v0.9, you need
> >> `yarn-standalone` instead of `yarn-cluster`.
> >>
> >> See
> >>
> https://github.com/apache/spark/commit/328c73d037c17440c2a91a6c88b4258fbefa0c08
> >>
> >> On Tue, May 13, 2014 at 11:36 PM, Xiangrui Meng 
> wrote:
> >> > Does v0.9 support yarn-cluster mode? I checked SparkContext.scala in
> >> > v0.9.1 and didn't see special handling of `yarn-cluster`. -Xiangrui
> >> >
> >> > On Mon, May 12, 2014 at 11:14 AM, DB Tsai 
> wrote:
> >> >> We're deploying Spark in yarn-cluster mode (Spark 0.9), and we add
> jar
> >> >> dependencies in command line with "--addJars" option. However, those
> >> >> external jars are only available in the driver (application running
> in
> >> >> hadoop), and not available in the executors (workers).
> >> >>
> >> >> After doing some research, we realize that we've to push those jars
> to
> >> >> executors in driver via sc.AddJar(fileName). Although in the driver's
> >> >> log
> >> >> (see the following), the jar is successfully added in the http server
> >> >> in the
> >> >> driver, and I confirm that it's downloadable from any machine in the
> >> >> network, I still get `java.lang.NoClassDefFoundError` in the
> executors.
> >> >>
> >> >> 14/05/09 14:51:41 INFO spark.SparkContext: Adde

Re: Distribute jar dependencies via sc.AddJar(fileName)

2014-05-16 Thread DB Tsai
The jars are actually there (and in classpath), but you need to load
through reflection. I've another thread giving the workaround.


Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Fri, May 16, 2014 at 1:37 PM, Robert James wrote:

> I've experienced the same bug, which I had to workaround manually.  I
> posted the details here:
>
> http://stackoverflow.com/questions/23687081/spark-workers-unable-to-find-jar-on-ec2-cluster
>
> On 5/15/14, DB Tsai  wrote:
> > Hi guys,
> >
> > I think it maybe a bug in Spark. I wrote some code to demonstrate the
> bug.
> >
> > Example 1) This is how Spark adds jars. Basically, add jars to
> > cutomURLClassLoader.
> >
> >
> https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling1.java
> >
> > It doesn't work for two reasons. a) We don't pass the
> > customURLClassLoader to task, so it's only available in the
> > Executor.scala.  b) Even we do so, we need to get the class by
> > loader.loadClass("Class Name").newInstance(), and get the Method by
> > getDeclaredMethod to run it.
> >
> >
> > Example 2) It works by getting the class using loadClass API, and then
> > get and run the Method by getDeclaredMethod. Since we don't know which
> > classes users will use, it's not a solution.
> >
> >
> https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling2.java
> >
> >
> > Example 3) Add jars to systemClassLoader and have them accessible in
> > JVM. Users can use the classes directly.
> >
> >
> https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling3.java
> >
> > I'm now porting example 3) to Spark, and will let you know if it works.
> >
> > Thanks.
> >
> > Sincerely,
> >
> > DB Tsai
> > ---
> > My Blog: https://www.dbtsai.com
> > LinkedIn: https://www.linkedin.com/in/dbtsai
> >
> >
> > On Thu, May 15, 2014 at 12:03 PM, DB Tsai  wrote:
> >> Hi Xiangrui,
> >>
> >> We're still using Spark 0.9 branch, and our job is submitted by
> >>
> >> ./bin/spark-class org.apache.spark.deploy.yarn.Client \
> >>   --jar  \
> >>   --class  \
> >>   --args  \
> >>   --num-workers  \
> >>   --master-class 
> >>   --master-memory  \
> >>   --worker-memory  \
> >>   --addJars 
> >>
> >>
> >> Based on my understanding of the code in yarn-standalone mode, the jar
> >> distributing from local machine to application master is through
> >> distributed
> >> cache (using hadoop yarn-client api). From application master to
> >> executors,
> >> it's through http server. I maybe wrong, but if you look at the code in
> >> SparkContext addJar method, you can see the jar is added to http server
> >> in
> >> yarn-standalone mode.
> >>
> >> if (SparkHadoopUtil.get.isYarnMode() && master ==
> >> "yarn-standalone") {
> >>   // In order for this to work in yarn standalone mode the
> >> user
> >> must specify the
> >>   // --addjars option to the client to upload the file into
> >> the
> >> distributed cache
> >>   // of the AM to make it show up in the current working
> >> directory.
> >>   val fileName = new Path(uri.getPath).getName()
> >>   try {
> >> env.httpFileServer.addJar(new File(fileName))
> >>   } catch {
> >>
> >> Those jars will be fetched in Executor from http server and added to
> >> classloader of "Executor" class, see
> >>
> >>   private def updateDependencies(newFiles: HashMap[String, Long],
> >> newJars:
> >> HashMap[String, Long]) {
> >> synchronized {
> >>   // Fetch missing dependencies
> >>   for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name,
> >> -1L) < timestamp) {
> >> logInfo("Fetching " + name + " with timestamp " + timestamp)
> >> Utils.fetchFile(name, new File(SparkFiles.getRootDirectory),
> >> conf)
> >> currentFiles(name) = timestamp
> >>   }
> >>   for ((name, timestamp) <- newJars if currentJars.getOrElse(name,
> >> -1L)
> >> < timestamp) {
> >> logInfo("Fetching " + name + " with timestamp " + timestamp)
> >> Utils.fetchFile(name, new File(SparkFiles.getRootDirectory),
> >> conf)
> >> currentJars(name) = timestamp
> >> // Add it to our class loader
> >> val localName = name.split("/").last
> >> val url = new File(SparkFiles.getRootDirectory,
> >> localName).toURI.toURL
> >>
> >> if (!urlClassLoader.getURLs.contains(url)) {
> >>   urlClassLoader.addURL(url)
> >> }
> >>   }
> >>
> >>
> >> The problem seems to be that jars are added to the classloader of
> >> "Executor"
> >> classes, and they are not accessible in Task.scala.
> >>
> >> I verified this by trying to load our custom classes in Executor.scala,
> >> and
> >> it works. But if I tried to load those classes in Task.scala, I'll get
> >> cl

Re: Distribute jar dependencies via sc.AddJar(fileName)

2014-05-16 Thread Robert James
I've experienced the same bug, which I had to workaround manually.  I
posted the details here:
http://stackoverflow.com/questions/23687081/spark-workers-unable-to-find-jar-on-ec2-cluster

On 5/15/14, DB Tsai  wrote:
> Hi guys,
>
> I think it maybe a bug in Spark. I wrote some code to demonstrate the bug.
>
> Example 1) This is how Spark adds jars. Basically, add jars to
> cutomURLClassLoader.
>
> https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling1.java
>
> It doesn't work for two reasons. a) We don't pass the
> customURLClassLoader to task, so it's only available in the
> Executor.scala.  b) Even we do so, we need to get the class by
> loader.loadClass("Class Name").newInstance(), and get the Method by
> getDeclaredMethod to run it.
>
>
> Example 2) It works by getting the class using loadClass API, and then
> get and run the Method by getDeclaredMethod. Since we don't know which
> classes users will use, it's not a solution.
>
> https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling2.java
>
>
> Example 3) Add jars to systemClassLoader and have them accessible in
> JVM. Users can use the classes directly.
>
> https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling3.java
>
> I'm now porting example 3) to Spark, and will let you know if it works.
>
> Thanks.
>
> Sincerely,
>
> DB Tsai
> ---
> My Blog: https://www.dbtsai.com
> LinkedIn: https://www.linkedin.com/in/dbtsai
>
>
> On Thu, May 15, 2014 at 12:03 PM, DB Tsai  wrote:
>> Hi Xiangrui,
>>
>> We're still using Spark 0.9 branch, and our job is submitted by
>>
>> ./bin/spark-class org.apache.spark.deploy.yarn.Client \
>>   --jar  \
>>   --class  \
>>   --args  \
>>   --num-workers  \
>>   --master-class 
>>   --master-memory  \
>>   --worker-memory  \
>>   --addJars 
>>
>>
>> Based on my understanding of the code in yarn-standalone mode, the jar
>> distributing from local machine to application master is through
>> distributed
>> cache (using hadoop yarn-client api). From application master to
>> executors,
>> it's through http server. I maybe wrong, but if you look at the code in
>> SparkContext addJar method, you can see the jar is added to http server
>> in
>> yarn-standalone mode.
>>
>> if (SparkHadoopUtil.get.isYarnMode() && master ==
>> "yarn-standalone") {
>>   // In order for this to work in yarn standalone mode the
>> user
>> must specify the
>>   // --addjars option to the client to upload the file into
>> the
>> distributed cache
>>   // of the AM to make it show up in the current working
>> directory.
>>   val fileName = new Path(uri.getPath).getName()
>>   try {
>> env.httpFileServer.addJar(new File(fileName))
>>   } catch {
>>
>> Those jars will be fetched in Executor from http server and added to
>> classloader of "Executor" class, see
>>
>>   private def updateDependencies(newFiles: HashMap[String, Long],
>> newJars:
>> HashMap[String, Long]) {
>> synchronized {
>>   // Fetch missing dependencies
>>   for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name,
>> -1L) < timestamp) {
>> logInfo("Fetching " + name + " with timestamp " + timestamp)
>> Utils.fetchFile(name, new File(SparkFiles.getRootDirectory),
>> conf)
>> currentFiles(name) = timestamp
>>   }
>>   for ((name, timestamp) <- newJars if currentJars.getOrElse(name,
>> -1L)
>> < timestamp) {
>> logInfo("Fetching " + name + " with timestamp " + timestamp)
>> Utils.fetchFile(name, new File(SparkFiles.getRootDirectory),
>> conf)
>> currentJars(name) = timestamp
>> // Add it to our class loader
>> val localName = name.split("/").last
>> val url = new File(SparkFiles.getRootDirectory,
>> localName).toURI.toURL
>>
>> if (!urlClassLoader.getURLs.contains(url)) {
>>   urlClassLoader.addURL(url)
>> }
>>   }
>>
>>
>> The problem seems to be that jars are added to the classloader of
>> "Executor"
>> classes, and they are not accessible in Task.scala.
>>
>> I verified this by trying to load our custom classes in Executor.scala,
>> and
>> it works. But if I tried to load those classes in Task.scala, I'll get
>> classNotFound exception.
>>
>> Thanks.
>>
>>
>>
>>
>>
>> Sincerely,
>>
>> DB Tsai
>> ---
>> My Blog: https://www.dbtsai.com
>> LinkedIn: https://www.linkedin.com/in/dbtsai
>>
>>
>> On Wed, May 14, 2014 at 6:04 PM, Xiangrui Meng  wrote:
>>>
>>> In SparkContext#addJar, for yarn-standalone mode, the workers should
>>> get the jars from local distributed cache instead of fetching them
>>> from the http server. Could you send the command you used to submit
>>> the job? -Xiangrui
>>>
>>> On Wed, May 14, 2014 at 1:26 AM, DB Tsai  wrote:
>>> > Hi Xiangrui,
>

Re: Distribute jar dependencies via sc.AddJar(fileName)

2014-05-16 Thread DB Tsai
Hi guys,

I think it maybe a bug in Spark. I wrote some code to demonstrate the bug.

Example 1) This is how Spark adds jars. Basically, add jars to
cutomURLClassLoader.

https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling1.java

It doesn't work for two reasons. a) We don't pass the
customURLClassLoader to task, so it's only available in the
Executor.scala.  b) Even we do so, we need to get the class by
loader.loadClass("Class Name").newInstance(), and get the Method by
getDeclaredMethod to run it.


Example 2) It works by getting the class using loadClass API, and then
get and run the Method by getDeclaredMethod. Since we don't know which
classes users will use, it's not a solution.

https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling2.java


Example 3) Add jars to systemClassLoader and have them accessible in
JVM. Users can use the classes directly.

https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling3.java

I'm now porting example 3) to Spark, and will let you know if it works.

Thanks.

Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Thu, May 15, 2014 at 12:03 PM, DB Tsai  wrote:
> Hi Xiangrui,
>
> We're still using Spark 0.9 branch, and our job is submitted by
>
> ./bin/spark-class org.apache.spark.deploy.yarn.Client \
>   --jar  \
>   --class  \
>   --args  \
>   --num-workers  \
>   --master-class 
>   --master-memory  \
>   --worker-memory  \
>   --addJars 
>
>
> Based on my understanding of the code in yarn-standalone mode, the jar
> distributing from local machine to application master is through distributed
> cache (using hadoop yarn-client api). From application master to executors,
> it's through http server. I maybe wrong, but if you look at the code in
> SparkContext addJar method, you can see the jar is added to http server in
> yarn-standalone mode.
>
> if (SparkHadoopUtil.get.isYarnMode() && master ==
> "yarn-standalone") {
>   // In order for this to work in yarn standalone mode the user
> must specify the
>   // --addjars option to the client to upload the file into the
> distributed cache
>   // of the AM to make it show up in the current working
> directory.
>   val fileName = new Path(uri.getPath).getName()
>   try {
> env.httpFileServer.addJar(new File(fileName))
>   } catch {
>
> Those jars will be fetched in Executor from http server and added to
> classloader of "Executor" class, see
>
>   private def updateDependencies(newFiles: HashMap[String, Long], newJars:
> HashMap[String, Long]) {
> synchronized {
>   // Fetch missing dependencies
>   for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name,
> -1L) < timestamp) {
> logInfo("Fetching " + name + " with timestamp " + timestamp)
> Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf)
> currentFiles(name) = timestamp
>   }
>   for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L)
> < timestamp) {
> logInfo("Fetching " + name + " with timestamp " + timestamp)
> Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf)
> currentJars(name) = timestamp
> // Add it to our class loader
> val localName = name.split("/").last
> val url = new File(SparkFiles.getRootDirectory,
> localName).toURI.toURL
>
> if (!urlClassLoader.getURLs.contains(url)) {
>   urlClassLoader.addURL(url)
> }
>   }
>
>
> The problem seems to be that jars are added to the classloader of "Executor"
> classes, and they are not accessible in Task.scala.
>
> I verified this by trying to load our custom classes in Executor.scala, and
> it works. But if I tried to load those classes in Task.scala, I'll get
> classNotFound exception.
>
> Thanks.
>
>
>
>
>
> Sincerely,
>
> DB Tsai
> ---
> My Blog: https://www.dbtsai.com
> LinkedIn: https://www.linkedin.com/in/dbtsai
>
>
> On Wed, May 14, 2014 at 6:04 PM, Xiangrui Meng  wrote:
>>
>> In SparkContext#addJar, for yarn-standalone mode, the workers should
>> get the jars from local distributed cache instead of fetching them
>> from the http server. Could you send the command you used to submit
>> the job? -Xiangrui
>>
>> On Wed, May 14, 2014 at 1:26 AM, DB Tsai  wrote:
>> > Hi Xiangrui,
>> >
>> > I actually used `yarn-standalone`, sorry for misleading. I did debugging
>> > in
>> > the last couple days, and everything up to updateDependency in
>> > executor.scala works. I also checked the file size and md5sum in the
>> > executors, and they are the same as the one in driver. Gonna do more
>> > testing
>> > tomorrow.
>> >
>> > Thanks.
>> >
>> >
>> > Sincerely,
>> >
>> > DB Tsai
>> > 

Re: Distribute jar dependencies via sc.AddJar(fileName)

2014-05-15 Thread Xiangrui Meng
In SparkContext#addJar, for yarn-standalone mode, the workers should
get the jars from local distributed cache instead of fetching them
from the http server. Could you send the command you used to submit
the job? -Xiangrui

On Wed, May 14, 2014 at 1:26 AM, DB Tsai  wrote:
> Hi Xiangrui,
>
> I actually used `yarn-standalone`, sorry for misleading. I did debugging in
> the last couple days, and everything up to updateDependency in
> executor.scala works. I also checked the file size and md5sum in the
> executors, and they are the same as the one in driver. Gonna do more testing
> tomorrow.
>
> Thanks.
>
>
> Sincerely,
>
> DB Tsai
> ---
> My Blog: https://www.dbtsai.com
> LinkedIn: https://www.linkedin.com/in/dbtsai
>
>
> On Tue, May 13, 2014 at 11:41 PM, Xiangrui Meng  wrote:
>>
>> I don't know whether this would fix the problem. In v0.9, you need
>> `yarn-standalone` instead of `yarn-cluster`.
>>
>> See
>> https://github.com/apache/spark/commit/328c73d037c17440c2a91a6c88b4258fbefa0c08
>>
>> On Tue, May 13, 2014 at 11:36 PM, Xiangrui Meng  wrote:
>> > Does v0.9 support yarn-cluster mode? I checked SparkContext.scala in
>> > v0.9.1 and didn't see special handling of `yarn-cluster`. -Xiangrui
>> >
>> > On Mon, May 12, 2014 at 11:14 AM, DB Tsai  wrote:
>> >> We're deploying Spark in yarn-cluster mode (Spark 0.9), and we add jar
>> >> dependencies in command line with "--addJars" option. However, those
>> >> external jars are only available in the driver (application running in
>> >> hadoop), and not available in the executors (workers).
>> >>
>> >> After doing some research, we realize that we've to push those jars to
>> >> executors in driver via sc.AddJar(fileName). Although in the driver's
>> >> log
>> >> (see the following), the jar is successfully added in the http server
>> >> in the
>> >> driver, and I confirm that it's downloadable from any machine in the
>> >> network, I still get `java.lang.NoClassDefFoundError` in the executors.
>> >>
>> >> 14/05/09 14:51:41 INFO spark.SparkContext: Added JAR
>> >> analyticshadoop-eba5cdce1.jar at
>> >> http://10.0.0.56:42522/jars/analyticshadoop-eba5cdce1.jar with
>> >> timestamp
>> >> 1399672301568
>> >>
>> >> Then I check the log in the executors, and I don't find anything
>> >> `Fetching
>> >>  with timestamp `, which implies something is wrong;
>> >> the
>> >> executors are not downloading the external jars.
>> >>
>> >> Any suggestion what we can look at?
>> >>
>> >> After digging into how spark distributes external jars, I wonder the
>> >> scalability of this approach. What if there are thousands of nodes
>> >> downloading the jar from single http server in the driver? Why don't we
>> >> push
>> >> the jars into HDFS distributed cache by default instead of distributing
>> >> them
>> >> via http server?
>> >>
>> >> Thanks.
>> >>
>> >> Sincerely,
>> >>
>> >> DB Tsai
>> >> ---
>> >> My Blog: https://www.dbtsai.com
>> >> LinkedIn: https://www.linkedin.com/in/dbtsai
>
>


Re: Distribute jar dependencies via sc.AddJar(fileName)

2014-05-14 Thread DB Tsai
Hi Xiangrui,

I actually used `yarn-standalone`, sorry for misleading. I did debugging in
the last couple days, and everything up to updateDependency in
executor.scala works. I also checked the file size and md5sum in the
executors, and they are the same as the one in driver. Gonna do more
testing tomorrow.

Thanks.


Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Tue, May 13, 2014 at 11:41 PM, Xiangrui Meng  wrote:

> I don't know whether this would fix the problem. In v0.9, you need
> `yarn-standalone` instead of `yarn-cluster`.
>
> See
> https://github.com/apache/spark/commit/328c73d037c17440c2a91a6c88b4258fbefa0c08
>
> On Tue, May 13, 2014 at 11:36 PM, Xiangrui Meng  wrote:
> > Does v0.9 support yarn-cluster mode? I checked SparkContext.scala in
> > v0.9.1 and didn't see special handling of `yarn-cluster`. -Xiangrui
> >
> > On Mon, May 12, 2014 at 11:14 AM, DB Tsai  wrote:
> >> We're deploying Spark in yarn-cluster mode (Spark 0.9), and we add jar
> >> dependencies in command line with "--addJars" option. However, those
> >> external jars are only available in the driver (application running in
> >> hadoop), and not available in the executors (workers).
> >>
> >> After doing some research, we realize that we've to push those jars to
> >> executors in driver via sc.AddJar(fileName). Although in the driver's
> log
> >> (see the following), the jar is successfully added in the http server
> in the
> >> driver, and I confirm that it's downloadable from any machine in the
> >> network, I still get `java.lang.NoClassDefFoundError` in the executors.
> >>
> >> 14/05/09 14:51:41 INFO spark.SparkContext: Added JAR
> >> analyticshadoop-eba5cdce1.jar at
> >> http://10.0.0.56:42522/jars/analyticshadoop-eba5cdce1.jar with
> timestamp
> >> 1399672301568
> >>
> >> Then I check the log in the executors, and I don't find anything
> `Fetching
> >>  with timestamp `, which implies something is wrong;
> the
> >> executors are not downloading the external jars.
> >>
> >> Any suggestion what we can look at?
> >>
> >> After digging into how spark distributes external jars, I wonder the
> >> scalability of this approach. What if there are thousands of nodes
> >> downloading the jar from single http server in the driver? Why don't we
> push
> >> the jars into HDFS distributed cache by default instead of distributing
> them
> >> via http server?
> >>
> >> Thanks.
> >>
> >> Sincerely,
> >>
> >> DB Tsai
> >> ---
> >> My Blog: https://www.dbtsai.com
> >> LinkedIn: https://www.linkedin.com/in/dbtsai
>


Re: Distribute jar dependencies via sc.AddJar(fileName)

2014-05-14 Thread Xiangrui Meng
I don't know whether this would fix the problem. In v0.9, you need
`yarn-standalone` instead of `yarn-cluster`.

See 
https://github.com/apache/spark/commit/328c73d037c17440c2a91a6c88b4258fbefa0c08

On Tue, May 13, 2014 at 11:36 PM, Xiangrui Meng  wrote:
> Does v0.9 support yarn-cluster mode? I checked SparkContext.scala in
> v0.9.1 and didn't see special handling of `yarn-cluster`. -Xiangrui
>
> On Mon, May 12, 2014 at 11:14 AM, DB Tsai  wrote:
>> We're deploying Spark in yarn-cluster mode (Spark 0.9), and we add jar
>> dependencies in command line with "--addJars" option. However, those
>> external jars are only available in the driver (application running in
>> hadoop), and not available in the executors (workers).
>>
>> After doing some research, we realize that we've to push those jars to
>> executors in driver via sc.AddJar(fileName). Although in the driver's log
>> (see the following), the jar is successfully added in the http server in the
>> driver, and I confirm that it's downloadable from any machine in the
>> network, I still get `java.lang.NoClassDefFoundError` in the executors.
>>
>> 14/05/09 14:51:41 INFO spark.SparkContext: Added JAR
>> analyticshadoop-eba5cdce1.jar at
>> http://10.0.0.56:42522/jars/analyticshadoop-eba5cdce1.jar with timestamp
>> 1399672301568
>>
>> Then I check the log in the executors, and I don't find anything `Fetching
>>  with timestamp `, which implies something is wrong; the
>> executors are not downloading the external jars.
>>
>> Any suggestion what we can look at?
>>
>> After digging into how spark distributes external jars, I wonder the
>> scalability of this approach. What if there are thousands of nodes
>> downloading the jar from single http server in the driver? Why don't we push
>> the jars into HDFS distributed cache by default instead of distributing them
>> via http server?
>>
>> Thanks.
>>
>> Sincerely,
>>
>> DB Tsai
>> ---
>> My Blog: https://www.dbtsai.com
>> LinkedIn: https://www.linkedin.com/in/dbtsai


Re: Distribute jar dependencies via sc.AddJar(fileName)

2014-05-14 Thread Xiangrui Meng
Does v0.9 support yarn-cluster mode? I checked SparkContext.scala in
v0.9.1 and didn't see special handling of `yarn-cluster`. -Xiangrui

On Mon, May 12, 2014 at 11:14 AM, DB Tsai  wrote:
> We're deploying Spark in yarn-cluster mode (Spark 0.9), and we add jar
> dependencies in command line with "--addJars" option. However, those
> external jars are only available in the driver (application running in
> hadoop), and not available in the executors (workers).
>
> After doing some research, we realize that we've to push those jars to
> executors in driver via sc.AddJar(fileName). Although in the driver's log
> (see the following), the jar is successfully added in the http server in the
> driver, and I confirm that it's downloadable from any machine in the
> network, I still get `java.lang.NoClassDefFoundError` in the executors.
>
> 14/05/09 14:51:41 INFO spark.SparkContext: Added JAR
> analyticshadoop-eba5cdce1.jar at
> http://10.0.0.56:42522/jars/analyticshadoop-eba5cdce1.jar with timestamp
> 1399672301568
>
> Then I check the log in the executors, and I don't find anything `Fetching
>  with timestamp `, which implies something is wrong; the
> executors are not downloading the external jars.
>
> Any suggestion what we can look at?
>
> After digging into how spark distributes external jars, I wonder the
> scalability of this approach. What if there are thousands of nodes
> downloading the jar from single http server in the driver? Why don't we push
> the jars into HDFS distributed cache by default instead of distributing them
> via http server?
>
> Thanks.
>
> Sincerely,
>
> DB Tsai
> ---
> My Blog: https://www.dbtsai.com
> LinkedIn: https://www.linkedin.com/in/dbtsai