Re: Distribute jar dependencies via sc.AddJar(fileName)

2014-05-16 Thread DB Tsai
Hi guys,

I think it maybe a bug in Spark. I wrote some code to demonstrate the bug.

Example 1) This is how Spark adds jars. Basically, add jars to
cutomURLClassLoader.

https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling1.java

It doesn't work for two reasons. a) We don't pass the
customURLClassLoader to task, so it's only available in the
Executor.scala.  b) Even we do so, we need to get the class by
loader.loadClass(Class Name).newInstance(), and get the Method by
getDeclaredMethod to run it.


Example 2) It works by getting the class using loadClass API, and then
get and run the Method by getDeclaredMethod. Since we don't know which
classes users will use, it's not a solution.

https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling2.java


Example 3) Add jars to systemClassLoader and have them accessible in
JVM. Users can use the classes directly.

https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling3.java

I'm now porting example 3) to Spark, and will let you know if it works.

Thanks.

Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Thu, May 15, 2014 at 12:03 PM, DB Tsai dbt...@stanford.edu wrote:
 Hi Xiangrui,

 We're still using Spark 0.9 branch, and our job is submitted by

 ./bin/spark-class org.apache.spark.deploy.yarn.Client \
   --jar YOUR_APP_JAR_FILE \
   --class APP_MAIN_CLASS \
   --args APP_MAIN_ARGUMENTS \
   --num-workers NUMBER_OF_WORKER_MACHINES \
   --master-class ApplicationMaster_CLASS
   --master-memory MEMORY_FOR_MASTER \
   --worker-memory MEMORY_PER_WORKER \
   --addJars any_local_files_used_in_SparkContext.addJar


 Based on my understanding of the code in yarn-standalone mode, the jar
 distributing from local machine to application master is through distributed
 cache (using hadoop yarn-client api). From application master to executors,
 it's through http server. I maybe wrong, but if you look at the code in
 SparkContext addJar method, you can see the jar is added to http server in
 yarn-standalone mode.

 if (SparkHadoopUtil.get.isYarnMode()  master ==
 yarn-standalone) {
   // In order for this to work in yarn standalone mode the user
 must specify the
   // --addjars option to the client to upload the file into the
 distributed cache
   // of the AM to make it show up in the current working
 directory.
   val fileName = new Path(uri.getPath).getName()
   try {
 env.httpFileServer.addJar(new File(fileName))
   } catch {

 Those jars will be fetched in Executor from http server and added to
 classloader of Executor class, see

   private def updateDependencies(newFiles: HashMap[String, Long], newJars:
 HashMap[String, Long]) {
 synchronized {
   // Fetch missing dependencies
   for ((name, timestamp) - newFiles if currentFiles.getOrElse(name,
 -1L)  timestamp) {
 logInfo(Fetching  + name +  with timestamp  + timestamp)
 Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf)
 currentFiles(name) = timestamp
   }
   for ((name, timestamp) - newJars if currentJars.getOrElse(name, -1L)
  timestamp) {
 logInfo(Fetching  + name +  with timestamp  + timestamp)
 Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf)
 currentJars(name) = timestamp
 // Add it to our class loader
 val localName = name.split(/).last
 val url = new File(SparkFiles.getRootDirectory,
 localName).toURI.toURL

 if (!urlClassLoader.getURLs.contains(url)) {
   urlClassLoader.addURL(url)
 }
   }


 The problem seems to be that jars are added to the classloader of Executor
 classes, and they are not accessible in Task.scala.

 I verified this by trying to load our custom classes in Executor.scala, and
 it works. But if I tried to load those classes in Task.scala, I'll get
 classNotFound exception.

 Thanks.





 Sincerely,

 DB Tsai
 ---
 My Blog: https://www.dbtsai.com
 LinkedIn: https://www.linkedin.com/in/dbtsai


 On Wed, May 14, 2014 at 6:04 PM, Xiangrui Meng men...@gmail.com wrote:

 In SparkContext#addJar, for yarn-standalone mode, the workers should
 get the jars from local distributed cache instead of fetching them
 from the http server. Could you send the command you used to submit
 the job? -Xiangrui

 On Wed, May 14, 2014 at 1:26 AM, DB Tsai dbt...@stanford.edu wrote:
  Hi Xiangrui,
 
  I actually used `yarn-standalone`, sorry for misleading. I did debugging
  in
  the last couple days, and everything up to updateDependency in
  executor.scala works. I also checked the file size and md5sum in the
  executors, and they are the same as the one in driver. Gonna do more
  testing
  

Re: Distribute jar dependencies via sc.AddJar(fileName)

2014-05-16 Thread Robert James
I've experienced the same bug, which I had to workaround manually.  I
posted the details here:
http://stackoverflow.com/questions/23687081/spark-workers-unable-to-find-jar-on-ec2-cluster

On 5/15/14, DB Tsai dbt...@stanford.edu wrote:
 Hi guys,

 I think it maybe a bug in Spark. I wrote some code to demonstrate the bug.

 Example 1) This is how Spark adds jars. Basically, add jars to
 cutomURLClassLoader.

 https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling1.java

 It doesn't work for two reasons. a) We don't pass the
 customURLClassLoader to task, so it's only available in the
 Executor.scala.  b) Even we do so, we need to get the class by
 loader.loadClass(Class Name).newInstance(), and get the Method by
 getDeclaredMethod to run it.


 Example 2) It works by getting the class using loadClass API, and then
 get and run the Method by getDeclaredMethod. Since we don't know which
 classes users will use, it's not a solution.

 https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling2.java


 Example 3) Add jars to systemClassLoader and have them accessible in
 JVM. Users can use the classes directly.

 https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling3.java

 I'm now porting example 3) to Spark, and will let you know if it works.

 Thanks.

 Sincerely,

 DB Tsai
 ---
 My Blog: https://www.dbtsai.com
 LinkedIn: https://www.linkedin.com/in/dbtsai


 On Thu, May 15, 2014 at 12:03 PM, DB Tsai dbt...@stanford.edu wrote:
 Hi Xiangrui,

 We're still using Spark 0.9 branch, and our job is submitted by

 ./bin/spark-class org.apache.spark.deploy.yarn.Client \
   --jar YOUR_APP_JAR_FILE \
   --class APP_MAIN_CLASS \
   --args APP_MAIN_ARGUMENTS \
   --num-workers NUMBER_OF_WORKER_MACHINES \
   --master-class ApplicationMaster_CLASS
   --master-memory MEMORY_FOR_MASTER \
   --worker-memory MEMORY_PER_WORKER \
   --addJars any_local_files_used_in_SparkContext.addJar


 Based on my understanding of the code in yarn-standalone mode, the jar
 distributing from local machine to application master is through
 distributed
 cache (using hadoop yarn-client api). From application master to
 executors,
 it's through http server. I maybe wrong, but if you look at the code in
 SparkContext addJar method, you can see the jar is added to http server
 in
 yarn-standalone mode.

 if (SparkHadoopUtil.get.isYarnMode()  master ==
 yarn-standalone) {
   // In order for this to work in yarn standalone mode the
 user
 must specify the
   // --addjars option to the client to upload the file into
 the
 distributed cache
   // of the AM to make it show up in the current working
 directory.
   val fileName = new Path(uri.getPath).getName()
   try {
 env.httpFileServer.addJar(new File(fileName))
   } catch {

 Those jars will be fetched in Executor from http server and added to
 classloader of Executor class, see

   private def updateDependencies(newFiles: HashMap[String, Long],
 newJars:
 HashMap[String, Long]) {
 synchronized {
   // Fetch missing dependencies
   for ((name, timestamp) - newFiles if currentFiles.getOrElse(name,
 -1L)  timestamp) {
 logInfo(Fetching  + name +  with timestamp  + timestamp)
 Utils.fetchFile(name, new File(SparkFiles.getRootDirectory),
 conf)
 currentFiles(name) = timestamp
   }
   for ((name, timestamp) - newJars if currentJars.getOrElse(name,
 -1L)
  timestamp) {
 logInfo(Fetching  + name +  with timestamp  + timestamp)
 Utils.fetchFile(name, new File(SparkFiles.getRootDirectory),
 conf)
 currentJars(name) = timestamp
 // Add it to our class loader
 val localName = name.split(/).last
 val url = new File(SparkFiles.getRootDirectory,
 localName).toURI.toURL

 if (!urlClassLoader.getURLs.contains(url)) {
   urlClassLoader.addURL(url)
 }
   }


 The problem seems to be that jars are added to the classloader of
 Executor
 classes, and they are not accessible in Task.scala.

 I verified this by trying to load our custom classes in Executor.scala,
 and
 it works. But if I tried to load those classes in Task.scala, I'll get
 classNotFound exception.

 Thanks.





 Sincerely,

 DB Tsai
 ---
 My Blog: https://www.dbtsai.com
 LinkedIn: https://www.linkedin.com/in/dbtsai


 On Wed, May 14, 2014 at 6:04 PM, Xiangrui Meng men...@gmail.com wrote:

 In SparkContext#addJar, for yarn-standalone mode, the workers should
 get the jars from local distributed cache instead of fetching them
 from the http server. Could you send the command you used to submit
 the job? -Xiangrui

 On Wed, May 14, 2014 at 1:26 AM, DB Tsai dbt...@stanford.edu wrote:
  Hi Xiangrui,
 
  I actually used 

Re: Distribute jar dependencies via sc.AddJar(fileName)

2014-05-16 Thread DB Tsai
The jars are actually there (and in classpath), but you need to load
through reflection. I've another thread giving the workaround.


Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Fri, May 16, 2014 at 1:37 PM, Robert James srobertja...@gmail.comwrote:

 I've experienced the same bug, which I had to workaround manually.  I
 posted the details here:

 http://stackoverflow.com/questions/23687081/spark-workers-unable-to-find-jar-on-ec2-cluster

 On 5/15/14, DB Tsai dbt...@stanford.edu wrote:
  Hi guys,
 
  I think it maybe a bug in Spark. I wrote some code to demonstrate the
 bug.
 
  Example 1) This is how Spark adds jars. Basically, add jars to
  cutomURLClassLoader.
 
 
 https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling1.java
 
  It doesn't work for two reasons. a) We don't pass the
  customURLClassLoader to task, so it's only available in the
  Executor.scala.  b) Even we do so, we need to get the class by
  loader.loadClass(Class Name).newInstance(), and get the Method by
  getDeclaredMethod to run it.
 
 
  Example 2) It works by getting the class using loadClass API, and then
  get and run the Method by getDeclaredMethod. Since we don't know which
  classes users will use, it's not a solution.
 
 
 https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling2.java
 
 
  Example 3) Add jars to systemClassLoader and have them accessible in
  JVM. Users can use the classes directly.
 
 
 https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling3.java
 
  I'm now porting example 3) to Spark, and will let you know if it works.
 
  Thanks.
 
  Sincerely,
 
  DB Tsai
  ---
  My Blog: https://www.dbtsai.com
  LinkedIn: https://www.linkedin.com/in/dbtsai
 
 
  On Thu, May 15, 2014 at 12:03 PM, DB Tsai dbt...@stanford.edu wrote:
  Hi Xiangrui,
 
  We're still using Spark 0.9 branch, and our job is submitted by
 
  ./bin/spark-class org.apache.spark.deploy.yarn.Client \
--jar YOUR_APP_JAR_FILE \
--class APP_MAIN_CLASS \
--args APP_MAIN_ARGUMENTS \
--num-workers NUMBER_OF_WORKER_MACHINES \
--master-class ApplicationMaster_CLASS
--master-memory MEMORY_FOR_MASTER \
--worker-memory MEMORY_PER_WORKER \
--addJars any_local_files_used_in_SparkContext.addJar
 
 
  Based on my understanding of the code in yarn-standalone mode, the jar
  distributing from local machine to application master is through
  distributed
  cache (using hadoop yarn-client api). From application master to
  executors,
  it's through http server. I maybe wrong, but if you look at the code in
  SparkContext addJar method, you can see the jar is added to http server
  in
  yarn-standalone mode.
 
  if (SparkHadoopUtil.get.isYarnMode()  master ==
  yarn-standalone) {
// In order for this to work in yarn standalone mode the
  user
  must specify the
// --addjars option to the client to upload the file into
  the
  distributed cache
// of the AM to make it show up in the current working
  directory.
val fileName = new Path(uri.getPath).getName()
try {
  env.httpFileServer.addJar(new File(fileName))
} catch {
 
  Those jars will be fetched in Executor from http server and added to
  classloader of Executor class, see
 
private def updateDependencies(newFiles: HashMap[String, Long],
  newJars:
  HashMap[String, Long]) {
  synchronized {
// Fetch missing dependencies
for ((name, timestamp) - newFiles if currentFiles.getOrElse(name,
  -1L)  timestamp) {
  logInfo(Fetching  + name +  with timestamp  + timestamp)
  Utils.fetchFile(name, new File(SparkFiles.getRootDirectory),
  conf)
  currentFiles(name) = timestamp
}
for ((name, timestamp) - newJars if currentJars.getOrElse(name,
  -1L)
   timestamp) {
  logInfo(Fetching  + name +  with timestamp  + timestamp)
  Utils.fetchFile(name, new File(SparkFiles.getRootDirectory),
  conf)
  currentJars(name) = timestamp
  // Add it to our class loader
  val localName = name.split(/).last
  val url = new File(SparkFiles.getRootDirectory,
  localName).toURI.toURL
 
  if (!urlClassLoader.getURLs.contains(url)) {
urlClassLoader.addURL(url)
  }
}
 
 
  The problem seems to be that jars are added to the classloader of
  Executor
  classes, and they are not accessible in Task.scala.
 
  I verified this by trying to load our custom classes in Executor.scala,
  and
  it works. But if I tried to load those classes in Task.scala, I'll get
  classNotFound exception.
 
  Thanks.
 
 
 
 
 
  Sincerely,
 
  DB Tsai
  

Re: Distribute jar dependencies via sc.AddJar(fileName)

2014-05-16 Thread DB Tsai
Hi Xiangrui,

We're still using Spark 0.9 branch, and our job is submitted by

./bin/spark-class org.apache.spark.deploy.yarn.Client \
  --jar YOUR_APP_JAR_FILE \
  --class APP_MAIN_CLASS \
  --args APP_MAIN_ARGUMENTS \
  --num-workers NUMBER_OF_WORKER_MACHINES \
  --master-class ApplicationMaster_CLASS
  --master-memory MEMORY_FOR_MASTER \
  --worker-memory MEMORY_PER_WORKER \
  --addJars any_local_files_used_in_SparkContext.addJar


Based on my understanding of the code in yarn-standalone mode, the jar
distributing from local machine to application master is through
distributed cache (using hadoop yarn-client api). From application master
to executors, it's through http server. I maybe wrong, but if you look at
the code in SparkContext addJar method, you can see the jar is added to
http server in yarn-standalone mode.

if (SparkHadoopUtil.get.isYarnMode()  master ==
yarn-standalone) {
  // In order for this to work in yarn standalone mode the user
must specify the
  // --addjars option to the client to upload the file into the
distributed cache
  // of the AM to make it show up in the current working
directory.
  val fileName = new Path(uri.getPath).getName()
  try {
env.httpFileServer.addJar(new File(fileName))
  } catch {

Those jars will be fetched in Executor from http server and added to
classloader of Executor class, see

  private def updateDependencies(newFiles: HashMap[String, Long], newJars:
HashMap[String, Long]) {
synchronized {
  // Fetch missing dependencies
  for ((name, timestamp) - newFiles if currentFiles.getOrElse(name,
-1L)  timestamp) {
logInfo(Fetching  + name +  with timestamp  + timestamp)
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf)
currentFiles(name) = timestamp
  }
  for ((name, timestamp) - newJars if currentJars.getOrElse(name, -1L)
 timestamp) {
logInfo(Fetching  + name +  with timestamp  + timestamp)
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf)
currentJars(name) = timestamp
// Add it to our class loader
val localName = name.split(/).last
val url = new File(SparkFiles.getRootDirectory,
localName).toURI.toURL

if (!urlClassLoader.getURLs.contains(url)) {
  urlClassLoader.addURL(url)
}
  }


The problem seems to be that jars are added to the classloader of
Executor classes, and they are not accessible in Task.scala.

I verified this by trying to load our custom classes in Executor.scala, and
it works. But if I tried to load those classes in Task.scala, I'll get
classNotFound exception.

Thanks.





Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Wed, May 14, 2014 at 6:04 PM, Xiangrui Meng men...@gmail.com wrote:

 In SparkContext#addJar, for yarn-standalone mode, the workers should
 get the jars from local distributed cache instead of fetching them
 from the http server. Could you send the command you used to submit
 the job? -Xiangrui

 On Wed, May 14, 2014 at 1:26 AM, DB Tsai dbt...@stanford.edu wrote:
  Hi Xiangrui,
 
  I actually used `yarn-standalone`, sorry for misleading. I did debugging
 in
  the last couple days, and everything up to updateDependency in
  executor.scala works. I also checked the file size and md5sum in the
  executors, and they are the same as the one in driver. Gonna do more
 testing
  tomorrow.
 
  Thanks.
 
 
  Sincerely,
 
  DB Tsai
  ---
  My Blog: https://www.dbtsai.com
  LinkedIn: https://www.linkedin.com/in/dbtsai
 
 
  On Tue, May 13, 2014 at 11:41 PM, Xiangrui Meng men...@gmail.com
 wrote:
 
  I don't know whether this would fix the problem. In v0.9, you need
  `yarn-standalone` instead of `yarn-cluster`.
 
  See
 
 https://github.com/apache/spark/commit/328c73d037c17440c2a91a6c88b4258fbefa0c08
 
  On Tue, May 13, 2014 at 11:36 PM, Xiangrui Meng men...@gmail.com
 wrote:
   Does v0.9 support yarn-cluster mode? I checked SparkContext.scala in
   v0.9.1 and didn't see special handling of `yarn-cluster`. -Xiangrui
  
   On Mon, May 12, 2014 at 11:14 AM, DB Tsai dbt...@stanford.edu
 wrote:
   We're deploying Spark in yarn-cluster mode (Spark 0.9), and we add
 jar
   dependencies in command line with --addJars option. However, those
   external jars are only available in the driver (application running
 in
   hadoop), and not available in the executors (workers).
  
   After doing some research, we realize that we've to push those jars
 to
   executors in driver via sc.AddJar(fileName). Although in the driver's
   log
   (see the following), the jar is successfully added in the http server
   in the
   driver, and I confirm that it's downloadable from any machine in the
   network, I still get `java.lang.NoClassDefFoundError` 

Re: Distribute jar dependencies via sc.AddJar(fileName)

2014-05-14 Thread Xiangrui Meng
I don't know whether this would fix the problem. In v0.9, you need
`yarn-standalone` instead of `yarn-cluster`.

See 
https://github.com/apache/spark/commit/328c73d037c17440c2a91a6c88b4258fbefa0c08

On Tue, May 13, 2014 at 11:36 PM, Xiangrui Meng men...@gmail.com wrote:
 Does v0.9 support yarn-cluster mode? I checked SparkContext.scala in
 v0.9.1 and didn't see special handling of `yarn-cluster`. -Xiangrui

 On Mon, May 12, 2014 at 11:14 AM, DB Tsai dbt...@stanford.edu wrote:
 We're deploying Spark in yarn-cluster mode (Spark 0.9), and we add jar
 dependencies in command line with --addJars option. However, those
 external jars are only available in the driver (application running in
 hadoop), and not available in the executors (workers).

 After doing some research, we realize that we've to push those jars to
 executors in driver via sc.AddJar(fileName). Although in the driver's log
 (see the following), the jar is successfully added in the http server in the
 driver, and I confirm that it's downloadable from any machine in the
 network, I still get `java.lang.NoClassDefFoundError` in the executors.

 14/05/09 14:51:41 INFO spark.SparkContext: Added JAR
 analyticshadoop-eba5cdce1.jar at
 http://10.0.0.56:42522/jars/analyticshadoop-eba5cdce1.jar with timestamp
 1399672301568

 Then I check the log in the executors, and I don't find anything `Fetching
 file with timestamp timestamp`, which implies something is wrong; the
 executors are not downloading the external jars.

 Any suggestion what we can look at?

 After digging into how spark distributes external jars, I wonder the
 scalability of this approach. What if there are thousands of nodes
 downloading the jar from single http server in the driver? Why don't we push
 the jars into HDFS distributed cache by default instead of distributing them
 via http server?

 Thanks.

 Sincerely,

 DB Tsai
 ---
 My Blog: https://www.dbtsai.com
 LinkedIn: https://www.linkedin.com/in/dbtsai


Re: Distribute jar dependencies via sc.AddJar(fileName)

2014-05-14 Thread DB Tsai
Hi Xiangrui,

I actually used `yarn-standalone`, sorry for misleading. I did debugging in
the last couple days, and everything up to updateDependency in
executor.scala works. I also checked the file size and md5sum in the
executors, and they are the same as the one in driver. Gonna do more
testing tomorrow.

Thanks.


Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Tue, May 13, 2014 at 11:41 PM, Xiangrui Meng men...@gmail.com wrote:

 I don't know whether this would fix the problem. In v0.9, you need
 `yarn-standalone` instead of `yarn-cluster`.

 See
 https://github.com/apache/spark/commit/328c73d037c17440c2a91a6c88b4258fbefa0c08

 On Tue, May 13, 2014 at 11:36 PM, Xiangrui Meng men...@gmail.com wrote:
  Does v0.9 support yarn-cluster mode? I checked SparkContext.scala in
  v0.9.1 and didn't see special handling of `yarn-cluster`. -Xiangrui
 
  On Mon, May 12, 2014 at 11:14 AM, DB Tsai dbt...@stanford.edu wrote:
  We're deploying Spark in yarn-cluster mode (Spark 0.9), and we add jar
  dependencies in command line with --addJars option. However, those
  external jars are only available in the driver (application running in
  hadoop), and not available in the executors (workers).
 
  After doing some research, we realize that we've to push those jars to
  executors in driver via sc.AddJar(fileName). Although in the driver's
 log
  (see the following), the jar is successfully added in the http server
 in the
  driver, and I confirm that it's downloadable from any machine in the
  network, I still get `java.lang.NoClassDefFoundError` in the executors.
 
  14/05/09 14:51:41 INFO spark.SparkContext: Added JAR
  analyticshadoop-eba5cdce1.jar at
  http://10.0.0.56:42522/jars/analyticshadoop-eba5cdce1.jar with
 timestamp
  1399672301568
 
  Then I check the log in the executors, and I don't find anything
 `Fetching
  file with timestamp timestamp`, which implies something is wrong;
 the
  executors are not downloading the external jars.
 
  Any suggestion what we can look at?
 
  After digging into how spark distributes external jars, I wonder the
  scalability of this approach. What if there are thousands of nodes
  downloading the jar from single http server in the driver? Why don't we
 push
  the jars into HDFS distributed cache by default instead of distributing
 them
  via http server?
 
  Thanks.
 
  Sincerely,
 
  DB Tsai
  ---
  My Blog: https://www.dbtsai.com
  LinkedIn: https://www.linkedin.com/in/dbtsai