Hi guys,

I think it maybe a bug in Spark. I wrote some code to demonstrate the bug.

Example 1) This is how Spark adds jars. Basically, add jars to
cutomURLClassLoader.

https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling1.java

It doesn't work for two reasons. a) We don't pass the
customURLClassLoader to task, so it's only available in the
Executor.scala.  b) Even we do so, we need to get the class by
loader.loadClass("Class Name").newInstance(), and get the Method by
getDeclaredMethod to run it.


Example 2) It works by getting the class using loadClass API, and then
get and run the Method by getDeclaredMethod. Since we don't know which
classes users will use, it's not a solution.

https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling2.java


Example 3) Add jars to systemClassLoader and have them accessible in
JVM. Users can use the classes directly.

https://github.com/dbtsai/classloader-experiement/blob/master/calling/src/main/java/Calling3.java

I'm now porting example 3) to Spark, and will let you know if it works.

Thanks.

Sincerely,

DB Tsai
-------------------------------------------------------
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Thu, May 15, 2014 at 12:03 PM, DB Tsai <dbt...@stanford.edu> wrote:
> Hi Xiangrui,
>
> We're still using Spark 0.9 branch, and our job is submitted by
>
> ./bin/spark-class org.apache.spark.deploy.yarn.Client \
>   --jar <YOUR_APP_JAR_FILE> \
>   --class <APP_MAIN_CLASS> \
>   --args <APP_MAIN_ARGUMENTS> \
>   --num-workers <NUMBER_OF_WORKER_MACHINES> \
>   --master-class <ApplicationMaster_CLASS>
>   --master-memory <MEMORY_FOR_MASTER> \
>   --worker-memory <MEMORY_PER_WORKER> \
>   --addJars <any_local_files_used_in_SparkContext.addJar>
>
>
> Based on my understanding of the code in yarn-standalone mode, the jar
> distributing from local machine to application master is through distributed
> cache (using hadoop yarn-client api). From application master to executors,
> it's through http server. I maybe wrong, but if you look at the code in
> SparkContext addJar method, you can see the jar is added to http server in
> yarn-standalone mode.
>
>             if (SparkHadoopUtil.get.isYarnMode() && master ==
> "yarn-standalone") {
>               // In order for this to work in yarn standalone mode the user
> must specify the
>               // --addjars option to the client to upload the file into the
> distributed cache
>               // of the AM to make it show up in the current working
> directory.
>               val fileName = new Path(uri.getPath).getName()
>               try {
>                 env.httpFileServer.addJar(new File(fileName))
>               } catch {
>
> Those jars will be fetched in Executor from http server and added to
> classloader of "Executor" class, see
>
>   private def updateDependencies(newFiles: HashMap[String, Long], newJars:
> HashMap[String, Long]) {
>     synchronized {
>       // Fetch missing dependencies
>       for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name,
> -1L) < timestamp) {
>         logInfo("Fetching " + name + " with timestamp " + timestamp)
>         Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf)
>         currentFiles(name) = timestamp
>       }
>       for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L)
> < timestamp) {
>         logInfo("Fetching " + name + " with timestamp " + timestamp)
>         Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf)
>         currentJars(name) = timestamp
>         // Add it to our class loader
>         val localName = name.split("/").last
>         val url = new File(SparkFiles.getRootDirectory,
> localName).toURI.toURL
>
>         if (!urlClassLoader.getURLs.contains(url)) {
>           urlClassLoader.addURL(url)
>         }
>       }
>
>
> The problem seems to be that jars are added to the classloader of "Executor"
> classes, and they are not accessible in Task.scala.
>
> I verified this by trying to load our custom classes in Executor.scala, and
> it works. But if I tried to load those classes in Task.scala, I'll get
> classNotFound exception.
>
> Thanks.
>
>
>
>
>
> Sincerely,
>
> DB Tsai
> -------------------------------------------------------
> My Blog: https://www.dbtsai.com
> LinkedIn: https://www.linkedin.com/in/dbtsai
>
>
> On Wed, May 14, 2014 at 6:04 PM, Xiangrui Meng <men...@gmail.com> wrote:
>>
>> In SparkContext#addJar, for yarn-standalone mode, the workers should
>> get the jars from local distributed cache instead of fetching them
>> from the http server. Could you send the command you used to submit
>> the job? -Xiangrui
>>
>> On Wed, May 14, 2014 at 1:26 AM, DB Tsai <dbt...@stanford.edu> wrote:
>> > Hi Xiangrui,
>> >
>> > I actually used `yarn-standalone`, sorry for misleading. I did debugging
>> > in
>> > the last couple days, and everything up to updateDependency in
>> > executor.scala works. I also checked the file size and md5sum in the
>> > executors, and they are the same as the one in driver. Gonna do more
>> > testing
>> > tomorrow.
>> >
>> > Thanks.
>> >
>> >
>> > Sincerely,
>> >
>> > DB Tsai
>> > -------------------------------------------------------
>> > My Blog: https://www.dbtsai.com
>> > LinkedIn: https://www.linkedin.com/in/dbtsai
>> >
>> >
>> > On Tue, May 13, 2014 at 11:41 PM, Xiangrui Meng <men...@gmail.com>
>> > wrote:
>> >>
>> >> I don't know whether this would fix the problem. In v0.9, you need
>> >> `yarn-standalone` instead of `yarn-cluster`.
>> >>
>> >> See
>> >>
>> >> https://github.com/apache/spark/commit/328c73d037c17440c2a91a6c88b4258fbefa0c08
>> >>
>> >> On Tue, May 13, 2014 at 11:36 PM, Xiangrui Meng <men...@gmail.com>
>> >> wrote:
>> >> > Does v0.9 support yarn-cluster mode? I checked SparkContext.scala in
>> >> > v0.9.1 and didn't see special handling of `yarn-cluster`. -Xiangrui
>> >> >
>> >> > On Mon, May 12, 2014 at 11:14 AM, DB Tsai <dbt...@stanford.edu>
>> >> > wrote:
>> >> >> We're deploying Spark in yarn-cluster mode (Spark 0.9), and we add
>> >> >> jar
>> >> >> dependencies in command line with "--addJars" option. However, those
>> >> >> external jars are only available in the driver (application running
>> >> >> in
>> >> >> hadoop), and not available in the executors (workers).
>> >> >>
>> >> >> After doing some research, we realize that we've to push those jars
>> >> >> to
>> >> >> executors in driver via sc.AddJar(fileName). Although in the
>> >> >> driver's
>> >> >> log
>> >> >> (see the following), the jar is successfully added in the http
>> >> >> server
>> >> >> in the
>> >> >> driver, and I confirm that it's downloadable from any machine in the
>> >> >> network, I still get `java.lang.NoClassDefFoundError` in the
>> >> >> executors.
>> >> >>
>> >> >> 14/05/09 14:51:41 INFO spark.SparkContext: Added JAR
>> >> >> analyticshadoop-eba5cdce1.jar at
>> >> >> http://10.0.0.56:42522/jars/analyticshadoop-eba5cdce1.jar with
>> >> >> timestamp
>> >> >> 1399672301568
>> >> >>
>> >> >> Then I check the log in the executors, and I don't find anything
>> >> >> `Fetching
>> >> >> <file> with timestamp <timestamp>`, which implies something is
>> >> >> wrong;
>> >> >> the
>> >> >> executors are not downloading the external jars.
>> >> >>
>> >> >> Any suggestion what we can look at?
>> >> >>
>> >> >> After digging into how spark distributes external jars, I wonder the
>> >> >> scalability of this approach. What if there are thousands of nodes
>> >> >> downloading the jar from single http server in the driver? Why don't
>> >> >> we
>> >> >> push
>> >> >> the jars into HDFS distributed cache by default instead of
>> >> >> distributing
>> >> >> them
>> >> >> via http server?
>> >> >>
>> >> >> Thanks.
>> >> >>
>> >> >> Sincerely,
>> >> >>
>> >> >> DB Tsai
>> >> >> -------------------------------------------------------
>> >> >> My Blog: https://www.dbtsai.com
>> >> >> LinkedIn: https://www.linkedin.com/in/dbtsai
>> >
>> >
>
>

Reply via email to