Hi Xiangrui, We're still using Spark 0.9 branch, and our job is submitted by
./bin/spark-class org.apache.spark.deploy.yarn.Client \ --jar <YOUR_APP_JAR_FILE> \ --class <APP_MAIN_CLASS> \ --args <APP_MAIN_ARGUMENTS> \ --num-workers <NUMBER_OF_WORKER_MACHINES> \ --master-class <ApplicationMaster_CLASS> --master-memory <MEMORY_FOR_MASTER> \ --worker-memory <MEMORY_PER_WORKER> \ --addJars <any_local_files_used_in_SparkContext.addJar> Based on my understanding of the code in yarn-standalone mode, the jar distributing from local machine to application master is through distributed cache (using hadoop yarn-client api). From application master to executors, it's through http server. I maybe wrong, but if you look at the code in SparkContext addJar method, you can see the jar is added to http server in yarn-standalone mode. if (SparkHadoopUtil.get.isYarnMode() && master == "yarn-standalone") { // In order for this to work in yarn standalone mode the user must specify the // --addjars option to the client to upload the file into the distributed cache // of the AM to make it show up in the current working directory. val fileName = new Path(uri.getPath).getName() try { env.httpFileServer.addJar(new File(fileName)) } catch { Those jars will be fetched in Executor from http server and added to classloader of "Executor" class, see private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) { synchronized { // Fetch missing dependencies for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) { logInfo("Fetching " + name + " with timestamp " + timestamp) Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf) currentFiles(name) = timestamp } for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) { logInfo("Fetching " + name + " with timestamp " + timestamp) Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf) currentJars(name) = timestamp // Add it to our class loader val localName = name.split("/").last val url = new File(SparkFiles.getRootDirectory, localName).toURI.toURL if (!urlClassLoader.getURLs.contains(url)) { urlClassLoader.addURL(url) } } The problem seems to be that jars are added to the classloader of "Executor" classes, and they are not accessible in Task.scala. I verified this by trying to load our custom classes in Executor.scala, and it works. But if I tried to load those classes in Task.scala, I'll get classNotFound exception. Thanks. Sincerely, DB Tsai ------------------------------------------------------- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, May 14, 2014 at 6:04 PM, Xiangrui Meng <men...@gmail.com> wrote: > In SparkContext#addJar, for yarn-standalone mode, the workers should > get the jars from local distributed cache instead of fetching them > from the http server. Could you send the command you used to submit > the job? -Xiangrui > > On Wed, May 14, 2014 at 1:26 AM, DB Tsai <dbt...@stanford.edu> wrote: > > Hi Xiangrui, > > > > I actually used `yarn-standalone`, sorry for misleading. I did debugging > in > > the last couple days, and everything up to updateDependency in > > executor.scala works. I also checked the file size and md5sum in the > > executors, and they are the same as the one in driver. Gonna do more > testing > > tomorrow. > > > > Thanks. > > > > > > Sincerely, > > > > DB Tsai > > ------------------------------------------------------- > > My Blog: https://www.dbtsai.com > > LinkedIn: https://www.linkedin.com/in/dbtsai > > > > > > On Tue, May 13, 2014 at 11:41 PM, Xiangrui Meng <men...@gmail.com> > wrote: > >> > >> I don't know whether this would fix the problem. In v0.9, you need > >> `yarn-standalone` instead of `yarn-cluster`. > >> > >> See > >> > https://github.com/apache/spark/commit/328c73d037c17440c2a91a6c88b4258fbefa0c08 > >> > >> On Tue, May 13, 2014 at 11:36 PM, Xiangrui Meng <men...@gmail.com> > wrote: > >> > Does v0.9 support yarn-cluster mode? I checked SparkContext.scala in > >> > v0.9.1 and didn't see special handling of `yarn-cluster`. -Xiangrui > >> > > >> > On Mon, May 12, 2014 at 11:14 AM, DB Tsai <dbt...@stanford.edu> > wrote: > >> >> We're deploying Spark in yarn-cluster mode (Spark 0.9), and we add > jar > >> >> dependencies in command line with "--addJars" option. However, those > >> >> external jars are only available in the driver (application running > in > >> >> hadoop), and not available in the executors (workers). > >> >> > >> >> After doing some research, we realize that we've to push those jars > to > >> >> executors in driver via sc.AddJar(fileName). Although in the driver's > >> >> log > >> >> (see the following), the jar is successfully added in the http server > >> >> in the > >> >> driver, and I confirm that it's downloadable from any machine in the > >> >> network, I still get `java.lang.NoClassDefFoundError` in the > executors. > >> >> > >> >> 14/05/09 14:51:41 INFO spark.SparkContext: Added JAR > >> >> analyticshadoop-eba5cdce1.jar at > >> >> http://10.0.0.56:42522/jars/analyticshadoop-eba5cdce1.jar with > >> >> timestamp > >> >> 1399672301568 > >> >> > >> >> Then I check the log in the executors, and I don't find anything > >> >> `Fetching > >> >> <file> with timestamp <timestamp>`, which implies something is wrong; > >> >> the > >> >> executors are not downloading the external jars. > >> >> > >> >> Any suggestion what we can look at? > >> >> > >> >> After digging into how spark distributes external jars, I wonder the > >> >> scalability of this approach. What if there are thousands of nodes > >> >> downloading the jar from single http server in the driver? Why don't > we > >> >> push > >> >> the jars into HDFS distributed cache by default instead of > distributing > >> >> them > >> >> via http server? > >> >> > >> >> Thanks. > >> >> > >> >> Sincerely, > >> >> > >> >> DB Tsai > >> >> ------------------------------------------------------- > >> >> My Blog: https://www.dbtsai.com > >> >> LinkedIn: https://www.linkedin.com/in/dbtsai > > > > >