Re: Distribute jar dependencies via sc.AddJar(fileName)
Hi guys, I think it maybe a bug in Spark. I wrote some code to demonstrate the bug. Example 1) This is how Spark adds jars. Basically, add jars to cutomURLClassLoader. https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling1.java It doesn't work for two reasons. a) We don't pass the customURLClassLoader to task, so it's only available in the Executor.scala. b) Even we do so, we need to get the class by loader.loadClass(Class Name).newInstance(), and get the Method by getDeclaredMethod to run it. Example 2) It works by getting the class using loadClass API, and then get and run the Method by getDeclaredMethod. Since we don't know which classes users will use, it's not a solution. https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling2.java Example 3) Add jars to systemClassLoader and have them accessible in JVM. Users can use the classes directly. https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling3.java I'm now porting example 3) to Spark, and will let you know if it works. Thanks. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Thu, May 15, 2014 at 12:03 PM, DB Tsai dbt...@stanford.edu wrote: Hi Xiangrui, We're still using Spark 0.9 branch, and our job is submitted by ./bin/spark-class org.apache.spark.deploy.yarn.Client \ --jar YOUR_APP_JAR_FILE \ --class APP_MAIN_CLASS \ --args APP_MAIN_ARGUMENTS \ --num-workers NUMBER_OF_WORKER_MACHINES \ --master-class ApplicationMaster_CLASS --master-memory MEMORY_FOR_MASTER \ --worker-memory MEMORY_PER_WORKER \ --addJars any_local_files_used_in_SparkContext.addJar Based on my understanding of the code in yarn-standalone mode, the jar distributing from local machine to application master is through distributed cache (using hadoop yarn-client api). From application master to executors, it's through http server. I maybe wrong, but if you look at the code in SparkContext addJar method, you can see the jar is added to http server in yarn-standalone mode. if (SparkHadoopUtil.get.isYarnMode() master == yarn-standalone) { // In order for this to work in yarn standalone mode the user must specify the // --addjars option to the client to upload the file into the distributed cache // of the AM to make it show up in the current working directory. val fileName = new Path(uri.getPath).getName() try { env.httpFileServer.addJar(new File(fileName)) } catch { Those jars will be fetched in Executor from http server and added to classloader of Executor class, see private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) { synchronized { // Fetch missing dependencies for ((name, timestamp) - newFiles if currentFiles.getOrElse(name, -1L) timestamp) { logInfo(Fetching + name + with timestamp + timestamp) Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf) currentFiles(name) = timestamp } for ((name, timestamp) - newJars if currentJars.getOrElse(name, -1L) timestamp) { logInfo(Fetching + name + with timestamp + timestamp) Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf) currentJars(name) = timestamp // Add it to our class loader val localName = name.split(/).last val url = new File(SparkFiles.getRootDirectory, localName).toURI.toURL if (!urlClassLoader.getURLs.contains(url)) { urlClassLoader.addURL(url) } } The problem seems to be that jars are added to the classloader of Executor classes, and they are not accessible in Task.scala. I verified this by trying to load our custom classes in Executor.scala, and it works. But if I tried to load those classes in Task.scala, I'll get classNotFound exception. Thanks. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, May 14, 2014 at 6:04 PM, Xiangrui Meng men...@gmail.com wrote: In SparkContext#addJar, for yarn-standalone mode, the workers should get the jars from local distributed cache instead of fetching them from the http server. Could you send the command you used to submit the job? -Xiangrui On Wed, May 14, 2014 at 1:26 AM, DB Tsai dbt...@stanford.edu wrote: Hi Xiangrui, I actually used `yarn-standalone`, sorry for misleading. I did debugging in the last couple days, and everything up to updateDependency in executor.scala works. I also checked the file size and md5sum in the executors, and they are the same as the one in driver. Gonna do more testing
Re: Distribute jar dependencies via sc.AddJar(fileName)
I've experienced the same bug, which I had to workaround manually. I posted the details here: http://stackoverflow.com/questions/23687081/spark-workers-unable-to-find-jar-on-ec2-cluster On 5/15/14, DB Tsai dbt...@stanford.edu wrote: Hi guys, I think it maybe a bug in Spark. I wrote some code to demonstrate the bug. Example 1) This is how Spark adds jars. Basically, add jars to cutomURLClassLoader. https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling1.java It doesn't work for two reasons. a) We don't pass the customURLClassLoader to task, so it's only available in the Executor.scala. b) Even we do so, we need to get the class by loader.loadClass(Class Name).newInstance(), and get the Method by getDeclaredMethod to run it. Example 2) It works by getting the class using loadClass API, and then get and run the Method by getDeclaredMethod. Since we don't know which classes users will use, it's not a solution. https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling2.java Example 3) Add jars to systemClassLoader and have them accessible in JVM. Users can use the classes directly. https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling3.java I'm now porting example 3) to Spark, and will let you know if it works. Thanks. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Thu, May 15, 2014 at 12:03 PM, DB Tsai dbt...@stanford.edu wrote: Hi Xiangrui, We're still using Spark 0.9 branch, and our job is submitted by ./bin/spark-class org.apache.spark.deploy.yarn.Client \ --jar YOUR_APP_JAR_FILE \ --class APP_MAIN_CLASS \ --args APP_MAIN_ARGUMENTS \ --num-workers NUMBER_OF_WORKER_MACHINES \ --master-class ApplicationMaster_CLASS --master-memory MEMORY_FOR_MASTER \ --worker-memory MEMORY_PER_WORKER \ --addJars any_local_files_used_in_SparkContext.addJar Based on my understanding of the code in yarn-standalone mode, the jar distributing from local machine to application master is through distributed cache (using hadoop yarn-client api). From application master to executors, it's through http server. I maybe wrong, but if you look at the code in SparkContext addJar method, you can see the jar is added to http server in yarn-standalone mode. if (SparkHadoopUtil.get.isYarnMode() master == yarn-standalone) { // In order for this to work in yarn standalone mode the user must specify the // --addjars option to the client to upload the file into the distributed cache // of the AM to make it show up in the current working directory. val fileName = new Path(uri.getPath).getName() try { env.httpFileServer.addJar(new File(fileName)) } catch { Those jars will be fetched in Executor from http server and added to classloader of Executor class, see private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) { synchronized { // Fetch missing dependencies for ((name, timestamp) - newFiles if currentFiles.getOrElse(name, -1L) timestamp) { logInfo(Fetching + name + with timestamp + timestamp) Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf) currentFiles(name) = timestamp } for ((name, timestamp) - newJars if currentJars.getOrElse(name, -1L) timestamp) { logInfo(Fetching + name + with timestamp + timestamp) Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf) currentJars(name) = timestamp // Add it to our class loader val localName = name.split(/).last val url = new File(SparkFiles.getRootDirectory, localName).toURI.toURL if (!urlClassLoader.getURLs.contains(url)) { urlClassLoader.addURL(url) } } The problem seems to be that jars are added to the classloader of Executor classes, and they are not accessible in Task.scala. I verified this by trying to load our custom classes in Executor.scala, and it works. But if I tried to load those classes in Task.scala, I'll get classNotFound exception. Thanks. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, May 14, 2014 at 6:04 PM, Xiangrui Meng men...@gmail.com wrote: In SparkContext#addJar, for yarn-standalone mode, the workers should get the jars from local distributed cache instead of fetching them from the http server. Could you send the command you used to submit the job? -Xiangrui On Wed, May 14, 2014 at 1:26 AM, DB Tsai dbt...@stanford.edu wrote: Hi Xiangrui, I actually used
Re: Distribute jar dependencies via sc.AddJar(fileName)
The jars are actually there (and in classpath), but you need to load through reflection. I've another thread giving the workaround. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Fri, May 16, 2014 at 1:37 PM, Robert James srobertja...@gmail.comwrote: I've experienced the same bug, which I had to workaround manually. I posted the details here: http://stackoverflow.com/questions/23687081/spark-workers-unable-to-find-jar-on-ec2-cluster On 5/15/14, DB Tsai dbt...@stanford.edu wrote: Hi guys, I think it maybe a bug in Spark. I wrote some code to demonstrate the bug. Example 1) This is how Spark adds jars. Basically, add jars to cutomURLClassLoader. https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling1.java It doesn't work for two reasons. a) We don't pass the customURLClassLoader to task, so it's only available in the Executor.scala. b) Even we do so, we need to get the class by loader.loadClass(Class Name).newInstance(), and get the Method by getDeclaredMethod to run it. Example 2) It works by getting the class using loadClass API, and then get and run the Method by getDeclaredMethod. Since we don't know which classes users will use, it's not a solution. https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling2.java Example 3) Add jars to systemClassLoader and have them accessible in JVM. Users can use the classes directly. https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling3.java I'm now porting example 3) to Spark, and will let you know if it works. Thanks. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Thu, May 15, 2014 at 12:03 PM, DB Tsai dbt...@stanford.edu wrote: Hi Xiangrui, We're still using Spark 0.9 branch, and our job is submitted by ./bin/spark-class org.apache.spark.deploy.yarn.Client \ --jar YOUR_APP_JAR_FILE \ --class APP_MAIN_CLASS \ --args APP_MAIN_ARGUMENTS \ --num-workers NUMBER_OF_WORKER_MACHINES \ --master-class ApplicationMaster_CLASS --master-memory MEMORY_FOR_MASTER \ --worker-memory MEMORY_PER_WORKER \ --addJars any_local_files_used_in_SparkContext.addJar Based on my understanding of the code in yarn-standalone mode, the jar distributing from local machine to application master is through distributed cache (using hadoop yarn-client api). From application master to executors, it's through http server. I maybe wrong, but if you look at the code in SparkContext addJar method, you can see the jar is added to http server in yarn-standalone mode. if (SparkHadoopUtil.get.isYarnMode() master == yarn-standalone) { // In order for this to work in yarn standalone mode the user must specify the // --addjars option to the client to upload the file into the distributed cache // of the AM to make it show up in the current working directory. val fileName = new Path(uri.getPath).getName() try { env.httpFileServer.addJar(new File(fileName)) } catch { Those jars will be fetched in Executor from http server and added to classloader of Executor class, see private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) { synchronized { // Fetch missing dependencies for ((name, timestamp) - newFiles if currentFiles.getOrElse(name, -1L) timestamp) { logInfo(Fetching + name + with timestamp + timestamp) Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf) currentFiles(name) = timestamp } for ((name, timestamp) - newJars if currentJars.getOrElse(name, -1L) timestamp) { logInfo(Fetching + name + with timestamp + timestamp) Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf) currentJars(name) = timestamp // Add it to our class loader val localName = name.split(/).last val url = new File(SparkFiles.getRootDirectory, localName).toURI.toURL if (!urlClassLoader.getURLs.contains(url)) { urlClassLoader.addURL(url) } } The problem seems to be that jars are added to the classloader of Executor classes, and they are not accessible in Task.scala. I verified this by trying to load our custom classes in Executor.scala, and it works. But if I tried to load those classes in Task.scala, I'll get classNotFound exception. Thanks. Sincerely, DB Tsai
Re: Distribute jar dependencies via sc.AddJar(fileName)
Hi Xiangrui, We're still using Spark 0.9 branch, and our job is submitted by ./bin/spark-class org.apache.spark.deploy.yarn.Client \ --jar YOUR_APP_JAR_FILE \ --class APP_MAIN_CLASS \ --args APP_MAIN_ARGUMENTS \ --num-workers NUMBER_OF_WORKER_MACHINES \ --master-class ApplicationMaster_CLASS --master-memory MEMORY_FOR_MASTER \ --worker-memory MEMORY_PER_WORKER \ --addJars any_local_files_used_in_SparkContext.addJar Based on my understanding of the code in yarn-standalone mode, the jar distributing from local machine to application master is through distributed cache (using hadoop yarn-client api). From application master to executors, it's through http server. I maybe wrong, but if you look at the code in SparkContext addJar method, you can see the jar is added to http server in yarn-standalone mode. if (SparkHadoopUtil.get.isYarnMode() master == yarn-standalone) { // In order for this to work in yarn standalone mode the user must specify the // --addjars option to the client to upload the file into the distributed cache // of the AM to make it show up in the current working directory. val fileName = new Path(uri.getPath).getName() try { env.httpFileServer.addJar(new File(fileName)) } catch { Those jars will be fetched in Executor from http server and added to classloader of Executor class, see private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) { synchronized { // Fetch missing dependencies for ((name, timestamp) - newFiles if currentFiles.getOrElse(name, -1L) timestamp) { logInfo(Fetching + name + with timestamp + timestamp) Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf) currentFiles(name) = timestamp } for ((name, timestamp) - newJars if currentJars.getOrElse(name, -1L) timestamp) { logInfo(Fetching + name + with timestamp + timestamp) Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf) currentJars(name) = timestamp // Add it to our class loader val localName = name.split(/).last val url = new File(SparkFiles.getRootDirectory, localName).toURI.toURL if (!urlClassLoader.getURLs.contains(url)) { urlClassLoader.addURL(url) } } The problem seems to be that jars are added to the classloader of Executor classes, and they are not accessible in Task.scala. I verified this by trying to load our custom classes in Executor.scala, and it works. But if I tried to load those classes in Task.scala, I'll get classNotFound exception. Thanks. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, May 14, 2014 at 6:04 PM, Xiangrui Meng men...@gmail.com wrote: In SparkContext#addJar, for yarn-standalone mode, the workers should get the jars from local distributed cache instead of fetching them from the http server. Could you send the command you used to submit the job? -Xiangrui On Wed, May 14, 2014 at 1:26 AM, DB Tsai dbt...@stanford.edu wrote: Hi Xiangrui, I actually used `yarn-standalone`, sorry for misleading. I did debugging in the last couple days, and everything up to updateDependency in executor.scala works. I also checked the file size and md5sum in the executors, and they are the same as the one in driver. Gonna do more testing tomorrow. Thanks. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Tue, May 13, 2014 at 11:41 PM, Xiangrui Meng men...@gmail.com wrote: I don't know whether this would fix the problem. In v0.9, you need `yarn-standalone` instead of `yarn-cluster`. See https://github.com/apache/spark/commit/328c73d037c17440c2a91a6c88b4258fbefa0c08 On Tue, May 13, 2014 at 11:36 PM, Xiangrui Meng men...@gmail.com wrote: Does v0.9 support yarn-cluster mode? I checked SparkContext.scala in v0.9.1 and didn't see special handling of `yarn-cluster`. -Xiangrui On Mon, May 12, 2014 at 11:14 AM, DB Tsai dbt...@stanford.edu wrote: We're deploying Spark in yarn-cluster mode (Spark 0.9), and we add jar dependencies in command line with --addJars option. However, those external jars are only available in the driver (application running in hadoop), and not available in the executors (workers). After doing some research, we realize that we've to push those jars to executors in driver via sc.AddJar(fileName). Although in the driver's log (see the following), the jar is successfully added in the http server in the driver, and I confirm that it's downloadable from any machine in the network, I still get `java.lang.NoClassDefFoundError`
Re: Distribute jar dependencies via sc.AddJar(fileName)
I don't know whether this would fix the problem. In v0.9, you need `yarn-standalone` instead of `yarn-cluster`. See https://github.com/apache/spark/commit/328c73d037c17440c2a91a6c88b4258fbefa0c08 On Tue, May 13, 2014 at 11:36 PM, Xiangrui Meng men...@gmail.com wrote: Does v0.9 support yarn-cluster mode? I checked SparkContext.scala in v0.9.1 and didn't see special handling of `yarn-cluster`. -Xiangrui On Mon, May 12, 2014 at 11:14 AM, DB Tsai dbt...@stanford.edu wrote: We're deploying Spark in yarn-cluster mode (Spark 0.9), and we add jar dependencies in command line with --addJars option. However, those external jars are only available in the driver (application running in hadoop), and not available in the executors (workers). After doing some research, we realize that we've to push those jars to executors in driver via sc.AddJar(fileName). Although in the driver's log (see the following), the jar is successfully added in the http server in the driver, and I confirm that it's downloadable from any machine in the network, I still get `java.lang.NoClassDefFoundError` in the executors. 14/05/09 14:51:41 INFO spark.SparkContext: Added JAR analyticshadoop-eba5cdce1.jar at http://10.0.0.56:42522/jars/analyticshadoop-eba5cdce1.jar with timestamp 1399672301568 Then I check the log in the executors, and I don't find anything `Fetching file with timestamp timestamp`, which implies something is wrong; the executors are not downloading the external jars. Any suggestion what we can look at? After digging into how spark distributes external jars, I wonder the scalability of this approach. What if there are thousands of nodes downloading the jar from single http server in the driver? Why don't we push the jars into HDFS distributed cache by default instead of distributing them via http server? Thanks. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai
Re: Distribute jar dependencies via sc.AddJar(fileName)
Hi Xiangrui, I actually used `yarn-standalone`, sorry for misleading. I did debugging in the last couple days, and everything up to updateDependency in executor.scala works. I also checked the file size and md5sum in the executors, and they are the same as the one in driver. Gonna do more testing tomorrow. Thanks. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Tue, May 13, 2014 at 11:41 PM, Xiangrui Meng men...@gmail.com wrote: I don't know whether this would fix the problem. In v0.9, you need `yarn-standalone` instead of `yarn-cluster`. See https://github.com/apache/spark/commit/328c73d037c17440c2a91a6c88b4258fbefa0c08 On Tue, May 13, 2014 at 11:36 PM, Xiangrui Meng men...@gmail.com wrote: Does v0.9 support yarn-cluster mode? I checked SparkContext.scala in v0.9.1 and didn't see special handling of `yarn-cluster`. -Xiangrui On Mon, May 12, 2014 at 11:14 AM, DB Tsai dbt...@stanford.edu wrote: We're deploying Spark in yarn-cluster mode (Spark 0.9), and we add jar dependencies in command line with --addJars option. However, those external jars are only available in the driver (application running in hadoop), and not available in the executors (workers). After doing some research, we realize that we've to push those jars to executors in driver via sc.AddJar(fileName). Although in the driver's log (see the following), the jar is successfully added in the http server in the driver, and I confirm that it's downloadable from any machine in the network, I still get `java.lang.NoClassDefFoundError` in the executors. 14/05/09 14:51:41 INFO spark.SparkContext: Added JAR analyticshadoop-eba5cdce1.jar at http://10.0.0.56:42522/jars/analyticshadoop-eba5cdce1.jar with timestamp 1399672301568 Then I check the log in the executors, and I don't find anything `Fetching file with timestamp timestamp`, which implies something is wrong; the executors are not downloading the external jars. Any suggestion what we can look at? After digging into how spark distributes external jars, I wonder the scalability of this approach. What if there are thousands of nodes downloading the jar from single http server in the driver? Why don't we push the jars into HDFS distributed cache by default instead of distributing them via http server? Thanks. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai