The jars are actually there (and in classpath), but you need to load through reflection. I've another thread giving the workaround.
Sincerely, DB Tsai ------------------------------------------------------- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Fri, May 16, 2014 at 1:37 PM, Robert James <srobertja...@gmail.com>wrote: > I've experienced the same bug, which I had to workaround manually. I > posted the details here: > > http://stackoverflow.com/questions/23687081/spark-workers-unable-to-find-jar-on-ec2-cluster > > On 5/15/14, DB Tsai <dbt...@stanford.edu> wrote: > > Hi guys, > > > > I think it maybe a bug in Spark. I wrote some code to demonstrate the > bug. > > > > Example 1) This is how Spark adds jars. Basically, add jars to > > cutomURLClassLoader. > > > > > https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling1.java > > > > It doesn't work for two reasons. a) We don't pass the > > customURLClassLoader to task, so it's only available in the > > Executor.scala. b) Even we do so, we need to get the class by > > loader.loadClass("Class Name").newInstance(), and get the Method by > > getDeclaredMethod to run it. > > > > > > Example 2) It works by getting the class using loadClass API, and then > > get and run the Method by getDeclaredMethod. Since we don't know which > > classes users will use, it's not a solution. > > > > > https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling2.java > > > > > > Example 3) Add jars to systemClassLoader and have them accessible in > > JVM. Users can use the classes directly. > > > > > https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling3.java > > > > I'm now porting example 3) to Spark, and will let you know if it works. > > > > Thanks. > > > > Sincerely, > > > > DB Tsai > > ------------------------------------------------------- > > My Blog: https://www.dbtsai.com > > LinkedIn: https://www.linkedin.com/in/dbtsai > > > > > > On Thu, May 15, 2014 at 12:03 PM, DB Tsai <dbt...@stanford.edu> wrote: > >> Hi Xiangrui, > >> > >> We're still using Spark 0.9 branch, and our job is submitted by > >> > >> ./bin/spark-class org.apache.spark.deploy.yarn.Client \ > >> --jar <YOUR_APP_JAR_FILE> \ > >> --class <APP_MAIN_CLASS> \ > >> --args <APP_MAIN_ARGUMENTS> \ > >> --num-workers <NUMBER_OF_WORKER_MACHINES> \ > >> --master-class <ApplicationMaster_CLASS> > >> --master-memory <MEMORY_FOR_MASTER> \ > >> --worker-memory <MEMORY_PER_WORKER> \ > >> --addJars <any_local_files_used_in_SparkContext.addJar> > >> > >> > >> Based on my understanding of the code in yarn-standalone mode, the jar > >> distributing from local machine to application master is through > >> distributed > >> cache (using hadoop yarn-client api). From application master to > >> executors, > >> it's through http server. I maybe wrong, but if you look at the code in > >> SparkContext addJar method, you can see the jar is added to http server > >> in > >> yarn-standalone mode. > >> > >> if (SparkHadoopUtil.get.isYarnMode() && master == > >> "yarn-standalone") { > >> // In order for this to work in yarn standalone mode the > >> user > >> must specify the > >> // --addjars option to the client to upload the file into > >> the > >> distributed cache > >> // of the AM to make it show up in the current working > >> directory. > >> val fileName = new Path(uri.getPath).getName() > >> try { > >> env.httpFileServer.addJar(new File(fileName)) > >> } catch { > >> > >> Those jars will be fetched in Executor from http server and added to > >> classloader of "Executor" class, see > >> > >> private def updateDependencies(newFiles: HashMap[String, Long], > >> newJars: > >> HashMap[String, Long]) { > >> synchronized { > >> // Fetch missing dependencies > >> for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, > >> -1L) < timestamp) { > >> logInfo("Fetching " + name + " with timestamp " + timestamp) > >> Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), > >> conf) > >> currentFiles(name) = timestamp > >> } > >> for ((name, timestamp) <- newJars if currentJars.getOrElse(name, > >> -1L) > >> < timestamp) { > >> logInfo("Fetching " + name + " with timestamp " + timestamp) > >> Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), > >> conf) > >> currentJars(name) = timestamp > >> // Add it to our class loader > >> val localName = name.split("/").last > >> val url = new File(SparkFiles.getRootDirectory, > >> localName).toURI.toURL > >> > >> if (!urlClassLoader.getURLs.contains(url)) { > >> urlClassLoader.addURL(url) > >> } > >> } > >> > >> > >> The problem seems to be that jars are added to the classloader of > >> "Executor" > >> classes, and they are not accessible in Task.scala. > >> > >> I verified this by trying to load our custom classes in Executor.scala, > >> and > >> it works. But if I tried to load those classes in Task.scala, I'll get > >> classNotFound exception. > >> > >> Thanks. > >> > >> > >> > >> > >> > >> Sincerely, > >> > >> DB Tsai > >> ------------------------------------------------------- > >> My Blog: https://www.dbtsai.com > >> LinkedIn: https://www.linkedin.com/in/dbtsai > >> > >> > >> On Wed, May 14, 2014 at 6:04 PM, Xiangrui Meng <men...@gmail.com> > wrote: > >>> > >>> In SparkContext#addJar, for yarn-standalone mode, the workers should > >>> get the jars from local distributed cache instead of fetching them > >>> from the http server. Could you send the command you used to submit > >>> the job? -Xiangrui > >>> > >>> On Wed, May 14, 2014 at 1:26 AM, DB Tsai <dbt...@stanford.edu> wrote: > >>> > Hi Xiangrui, > >>> > > >>> > I actually used `yarn-standalone`, sorry for misleading. I did > >>> > debugging > >>> > in > >>> > the last couple days, and everything up to updateDependency in > >>> > executor.scala works. I also checked the file size and md5sum in the > >>> > executors, and they are the same as the one in driver. Gonna do more > >>> > testing > >>> > tomorrow. > >>> > > >>> > Thanks. > >>> > > >>> > > >>> > Sincerely, > >>> > > >>> > DB Tsai > >>> > ------------------------------------------------------- > >>> > My Blog: https://www.dbtsai.com > >>> > LinkedIn: https://www.linkedin.com/in/dbtsai > >>> > > >>> > > >>> > On Tue, May 13, 2014 at 11:41 PM, Xiangrui Meng <men...@gmail.com> > >>> > wrote: > >>> >> > >>> >> I don't know whether this would fix the problem. In v0.9, you need > >>> >> `yarn-standalone` instead of `yarn-cluster`. > >>> >> > >>> >> See > >>> >> > >>> >> > https://github.com/apache/spark/commit/328c73d037c17440c2a91a6c88b4258fbefa0c08 > >>> >> > >>> >> On Tue, May 13, 2014 at 11:36 PM, Xiangrui Meng <men...@gmail.com> > >>> >> wrote: > >>> >> > Does v0.9 support yarn-cluster mode? I checked SparkContext.scala > >>> >> > in > >>> >> > v0.9.1 and didn't see special handling of `yarn-cluster`. > -Xiangrui > >>> >> > > >>> >> > On Mon, May 12, 2014 at 11:14 AM, DB Tsai <dbt...@stanford.edu> > >>> >> > wrote: > >>> >> >> We're deploying Spark in yarn-cluster mode (Spark 0.9), and we > add > >>> >> >> jar > >>> >> >> dependencies in command line with "--addJars" option. However, > >>> >> >> those > >>> >> >> external jars are only available in the driver (application > >>> >> >> running > >>> >> >> in > >>> >> >> hadoop), and not available in the executors (workers). > >>> >> >> > >>> >> >> After doing some research, we realize that we've to push those > >>> >> >> jars > >>> >> >> to > >>> >> >> executors in driver via sc.AddJar(fileName). Although in the > >>> >> >> driver's > >>> >> >> log > >>> >> >> (see the following), the jar is successfully added in the http > >>> >> >> server > >>> >> >> in the > >>> >> >> driver, and I confirm that it's downloadable from any machine in > >>> >> >> the > >>> >> >> network, I still get `java.lang.NoClassDefFoundError` in the > >>> >> >> executors. > >>> >> >> > >>> >> >> 14/05/09 14:51:41 INFO spark.SparkContext: Added JAR > >>> >> >> analyticshadoop-eba5cdce1.jar at > >>> >> >> http://10.0.0.56:42522/jars/analyticshadoop-eba5cdce1.jar with > >>> >> >> timestamp > >>> >> >> 1399672301568 > >>> >> >> > >>> >> >> Then I check the log in the executors, and I don't find anything > >>> >> >> `Fetching > >>> >> >> <file> with timestamp <timestamp>`, which implies something is > >>> >> >> wrong; > >>> >> >> the > >>> >> >> executors are not downloading the external jars. > >>> >> >> > >>> >> >> Any suggestion what we can look at? > >>> >> >> > >>> >> >> After digging into how spark distributes external jars, I wonder > >>> >> >> the > >>> >> >> scalability of this approach. What if there are thousands of > nodes > >>> >> >> downloading the jar from single http server in the driver? Why > >>> >> >> don't > >>> >> >> we > >>> >> >> push > >>> >> >> the jars into HDFS distributed cache by default instead of > >>> >> >> distributing > >>> >> >> them > >>> >> >> via http server? > >>> >> >> > >>> >> >> Thanks. > >>> >> >> > >>> >> >> Sincerely, > >>> >> >> > >>> >> >> DB Tsai > >>> >> >> ------------------------------------------------------- > >>> >> >> My Blog: https://www.dbtsai.com > >>> >> >> LinkedIn: https://www.linkedin.com/in/dbtsai > >>> > > >>> > > >> > >> > > >