Re: closure and ExceptionInInitializerError
Thank you for your reply, Bao. Actually, I am running the code on EC2 with SBT. 1 master and 3 slaves In order to reproduce the issue, I just created a directory with one scala souce file and a lib directory where the assembly jar is. $ sbt package run will run the project. The scala file is showed as below: // job.scala import org.apache.spark.SparkContext import scala.io.Source object anonymous extends App { val nameNodeURL = Source.fromFile(/root/spark-ec2/masters).mkString.trim val sparkPort = 7077 val sc = new SparkContext(spark:// + nameNodeURL + : + sparkPort, Test, System.getenv(SPARK_HOME), Seq(target/scala-2.9.3/test_2.9.3-0.1.jar)) val rdd1 = sc.parallelize(List(1, 2, 3, 4)) val a = 1 def run() { val rdd2 = rdd1.map(_ + a) println(rdd2.count) } run } The code works, this may explains why it works in shell. But if I changed the into an object with a main method, instead of using extends App, like: // job.scala import org.apache.spark.SparkContext import scala.io.Source object anonymous { val nameNodeURL = Source.fromFile(/root/spark-ec2/masters).mkString.trim val sparkPort = 7077 val sc = new SparkContext(spark:// + nameNodeURL + : + sparkPort, Test, System.getenv(SPARK_HOME), Seq(target/scala-2.9.3/test_2.9.3-0.1.jar)) val rdd1 = sc.parallelize(List(1, 2, 3, 4)) val a = 1 def run() { val rdd2 = rdd1.map(_ + a) println(rdd2.count) } def main(args: Array[String]){ run } } Two exceptions is thrown: 13/12/30 10:08:17 INFO cluster.ClusterTaskSetManager: Loss was due to java.lang.ExceptionInInitializerError java.lang.ExceptionInInitializerError at anonymous$$anonfun$1.apply$mcII$sp(job.scala:13) at anonymous$$anonfun$1.apply(job.scala:13) at anonymous$$anonfun$1.apply(job.scala:13) at scala.collection.Iterator$$anon$19.next(Iterator.scala:401) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:681) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:677) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:758) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:758) at org.apache.spark.scheduler.ResultTask.run(ResultTask.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) 13/12/30 10:08:17 INFO cluster.ClusterTaskSetManager: Starting task 0.0:0 as TID 2 on executor 0: ip-10-202-35-76.ec2.internal (PROCESS_LOCAL) 13/12/30 10:08:17 INFO cluster.ClusterTaskSetManager: Serialized task 0.0:0 as 1351 bytes in 1 ms 13/12/30 10:08:17 INFO cluster.ClusterTaskSetManager: Lost TID 2 (task 0.0:0) 13/12/30 10:08:17 INFO cluster.ClusterTaskSetManager: Loss was due to java.lang.NoClassDefFoundError java.lang.NoClassDefFoundError: Could not initialize class anonymous$ at anonymous$$anonfun$1.apply$mcII$sp(job.scala:13) at anonymous$$anonfun$1.apply(job.scala:13) at anonymous$$anonfun$1.apply(job.scala:13) at scala.collection.Iterator$$anon$19.next(Iterator.scala:401) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:681) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:677) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:758) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:758) at org.apache.spark.scheduler.ResultTask.run(ResultTask.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) The two snippets are doing the same thing, right? So what's the difference ? Any thoughts? Thank you. Hao. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/closure-and-ExceptionInInitializerError-tp77p99.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: closure and ExceptionInInitializerError
Redocpot, I tried your 2 snippets with spark-shell and both work fine. I only see problem if closure is not serializeable. scala val rdd1 = sc.parallelize(List(1, 2, 3, 4)) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at console:12 scala val a = 1 a: Int = 1 scala val rdd2 = rdd1.map(_ + a) rdd2: org.apache.spark.rdd.RDD[Int] = MappedRDD[5] at map at console:16 scala rdd2.count 13/12/30 03:50:59 INFO SparkContext: Starting job: count at console:19 13/12/30 03:50:59 INFO DAGScheduler: Got job 4 (count at console:19) with 2 output partitions (allowLocal=false) 13/12/30 03:50:59 INFO DAGScheduler: Final stage: Stage 4 (count at console:19) 13/12/30 03:50:59 INFO DAGScheduler: Parents of final stage: List() 13/12/30 03:50:59 INFO DAGScheduler: Missing parents: List() 13/12/30 03:50:59 INFO DAGScheduler: Submitting Stage 4 (MappedRDD[5] at map at console:16), which has no missing parents 13/12/30 03:50:59 INFO DAGScheduler: Submitting 2 missing tasks from Stage 4 (MappedRDD[5] at map at console:16) 13/12/30 03:50:59 INFO ClusterScheduler: Adding task set 4.0 with 2 tasks 13/12/30 03:50:59 INFO ClusterTaskSetManager: Starting task 4.0:0 as TID 8 on executor 0: worker1 (PROCESS_LOCAL) 13/12/30 03:50:59 INFO ClusterTaskSetManager: Serialized task 4.0:0 as 1839 bytes in 1 ms 13/12/30 03:50:59 INFO ClusterTaskSetManager: Starting task 4.0:1 as TID 9 on executor 1: worker2 (PROCESS_LOCAL) 13/12/30 03:50:59 INFO ClusterTaskSetManager: Serialized task 4.0:1 as 1839 bytes in 1 ms 13/12/30 03:51:00 INFO ClusterTaskSetManager: Finished TID 8 in 152 ms on worker1 (progress: 1/2) 13/12/30 03:51:00 INFO DAGScheduler: Completed ResultTask(4, 0) 13/12/30 03:51:00 INFO ClusterTaskSetManager: Finished TID 9 in 171 ms on worker2 (progress: 2/2) 13/12/30 03:51:00 INFO ClusterScheduler: Remove TaskSet 4.0 from pool 13/12/30 03:51:00 INFO DAGScheduler: Completed ResultTask(4, 1) 13/12/30 03:51:00 INFO DAGScheduler: Stage 4 (count at console:19) finished in 0.131 s 13/12/30 03:51:00 INFO SparkContext: Job finished: count at console:19, took 0.212351498 s res5: Long = 4 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/closure-and-ExceptionInInitializerError-tp77p98.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
closure and ExceptionInInitializerError
Hi, When playing with spark, I encountered an ExceptionInInitializerError. I am confused with the closure which spark uses. For example, I have 2 scala files in my project: *// Entry.scala object Entry { def main(args: Array[String]) = { anonymous.run() } } // anonymous.scala object anonymous { val rdd1 = sparkContext.parallelize(List(1, 2, 3, 4)) val a = 1 def run() { val rdd2 = rdd1.map(_ + a) println(rdd2.count) } }* The code above will give an ExceptionInInitializerError which says: *java.lang.ExceptionInInitializerError at job.analysis.anonymous$.init(anonymous.scala:37) at job.analysis.anonymous$.clinit(anonymous.scala) ... java.lang.NoClassDefFoundError: Could not initialize class job.analysis..anonymous$* But if I move *val a = 1* into the *run() * function, it works. So I think the problem is the object initialization. The object anonymous here is only created in the main process, not on the worker process, thus, when the workers want to access to val a which is defined as an attribute of anonymous. They just can not find it the initialized object. Correct me if I am wrong. I have also tested more about it: *// snippet 1 def run() = { class t extends Serializable { val a = 2 } val tt = new t() val rdd2 = rdd1.map(_ + tt.a) println(rdd2.count) } // this one works // snippet 2 def run() = { object t extends Serializable { val a = 2 } val rdd2 = rdd1.map(_ + t.a) println(rdd2.count) } // I encountered the same ExceptionInInitializerError* It seems strange to me, because, in snippet 1, the instance tt is created in the main process the same as snippet 2, except, in snippet 2, t is a standalone object. Could someone explain why it occurs ? Maybe it's just a basic question, but it really makes me confused a lot. Am I missing anything here ? Hao -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/closure-and-ExceptionInInitializerError-tp77.html Sent from the Apache Spark User List mailing list archive at Nabble.com.