Re: closure and ExceptionInInitializerError

2013-12-30 Thread redocpot
Thank you for your reply, Bao.

Actually, I am running the code on EC2 with SBT. 1 master and 3 slaves

In order to reproduce the issue, I just created a directory with one scala
souce file and a lib directory where the assembly jar is.

$ sbt package run

will run the project.

The scala file is showed as below:


// job.scala
import org.apache.spark.SparkContext
import scala.io.Source

object anonymous extends App {
  val nameNodeURL = Source.fromFile(/root/spark-ec2/masters).mkString.trim
  val sparkPort = 7077
  val sc = new SparkContext(spark:// + nameNodeURL + : + sparkPort,
Test,
System.getenv(SPARK_HOME),
Seq(target/scala-2.9.3/test_2.9.3-0.1.jar))
  val rdd1 = sc.parallelize(List(1, 2, 3, 4)) 
  val a = 1 

  def run() {
val rdd2 = rdd1.map(_ + a)
println(rdd2.count)
  }

  run 
}


The code works, this may explains why it works in shell.
But if I changed the into an object with a main method, instead of using
extends App, like:


// job.scala
import org.apache.spark.SparkContext
import scala.io.Source

object anonymous {
  val nameNodeURL = Source.fromFile(/root/spark-ec2/masters).mkString.trim
  val sparkPort = 7077
  val sc = new SparkContext(spark:// + nameNodeURL + : + sparkPort,
Test,
System.getenv(SPARK_HOME),
Seq(target/scala-2.9.3/test_2.9.3-0.1.jar))
  val rdd1 = sc.parallelize(List(1, 2, 3, 4)) 
  val a = 1 

  def run() {
val rdd2 = rdd1.map(_ + a)
println(rdd2.count)
  }

  def main(args: Array[String]){
run
  } 
}


Two exceptions is thrown:

13/12/30 10:08:17 INFO cluster.ClusterTaskSetManager: Loss was due to
java.lang.ExceptionInInitializerError
java.lang.ExceptionInInitializerError
at anonymous$$anonfun$1.apply$mcII$sp(job.scala:13)
at anonymous$$anonfun$1.apply(job.scala:13)
at anonymous$$anonfun$1.apply(job.scala:13)
at scala.collection.Iterator$$anon$19.next(Iterator.scala:401)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:681)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:677)
at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:758)
at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:758)
at org.apache.spark.scheduler.ResultTask.run(ResultTask.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)
13/12/30 10:08:17 INFO cluster.ClusterTaskSetManager: Starting task 0.0:0 as
TID 2 on executor 0: ip-10-202-35-76.ec2.internal (PROCESS_LOCAL)
13/12/30 10:08:17 INFO cluster.ClusterTaskSetManager: Serialized task 0.0:0
as 1351 bytes in 1 ms
13/12/30 10:08:17 INFO cluster.ClusterTaskSetManager: Lost TID 2 (task
0.0:0)
13/12/30 10:08:17 INFO cluster.ClusterTaskSetManager: Loss was due to
java.lang.NoClassDefFoundError
java.lang.NoClassDefFoundError: Could not initialize class anonymous$
at anonymous$$anonfun$1.apply$mcII$sp(job.scala:13)
at anonymous$$anonfun$1.apply(job.scala:13)
at anonymous$$anonfun$1.apply(job.scala:13)
at scala.collection.Iterator$$anon$19.next(Iterator.scala:401)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:681)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:677)
at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:758)
at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:758)
at org.apache.spark.scheduler.ResultTask.run(ResultTask.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)

The two snippets are doing the same thing, right?
So what's the difference ? Any thoughts?

Thank you.

Hao.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/closure-and-ExceptionInInitializerError-tp77p99.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: closure and ExceptionInInitializerError

2013-12-29 Thread Bao
Redocpot, I tried your 2 snippets with spark-shell and both work fine. I only
see problem if closure is not serializeable.

scala val rdd1 = sc.parallelize(List(1, 2, 3, 4)) 
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at
parallelize at console:12

scala val a = 1   
a: Int = 1

scala val rdd2 = rdd1.map(_ + a) 
rdd2: org.apache.spark.rdd.RDD[Int] = MappedRDD[5] at map at console:16

scala rdd2.count
13/12/30 03:50:59 INFO SparkContext: Starting job: count at console:19
13/12/30 03:50:59 INFO DAGScheduler: Got job 4 (count at console:19) with
2 output partitions (allowLocal=false)
13/12/30 03:50:59 INFO DAGScheduler: Final stage: Stage 4 (count at
console:19)
13/12/30 03:50:59 INFO DAGScheduler: Parents of final stage: List()
13/12/30 03:50:59 INFO DAGScheduler: Missing parents: List()
13/12/30 03:50:59 INFO DAGScheduler: Submitting Stage 4 (MappedRDD[5] at map
at console:16), which has no missing parents
13/12/30 03:50:59 INFO DAGScheduler: Submitting 2 missing tasks from Stage 4
(MappedRDD[5] at map at console:16)
13/12/30 03:50:59 INFO ClusterScheduler: Adding task set 4.0 with 2 tasks
13/12/30 03:50:59 INFO ClusterTaskSetManager: Starting task 4.0:0 as TID 8
on executor 0: worker1 (PROCESS_LOCAL)
13/12/30 03:50:59 INFO ClusterTaskSetManager: Serialized task 4.0:0 as 1839
bytes in 1 ms
13/12/30 03:50:59 INFO ClusterTaskSetManager: Starting task 4.0:1 as TID 9
on executor 1: worker2 (PROCESS_LOCAL)
13/12/30 03:50:59 INFO ClusterTaskSetManager: Serialized task 4.0:1 as 1839
bytes in 1 ms
13/12/30 03:51:00 INFO ClusterTaskSetManager: Finished TID 8 in 152 ms on
worker1 (progress: 1/2)
13/12/30 03:51:00 INFO DAGScheduler: Completed ResultTask(4, 0)
13/12/30 03:51:00 INFO ClusterTaskSetManager: Finished TID 9 in 171 ms on
worker2 (progress: 2/2)
13/12/30 03:51:00 INFO ClusterScheduler: Remove TaskSet 4.0 from pool 
13/12/30 03:51:00 INFO DAGScheduler: Completed ResultTask(4, 1)
13/12/30 03:51:00 INFO DAGScheduler: Stage 4 (count at console:19)
finished in 0.131 s
13/12/30 03:51:00 INFO SparkContext: Job finished: count at console:19,
took 0.212351498 s
res5: Long = 4




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/closure-and-ExceptionInInitializerError-tp77p98.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


closure and ExceptionInInitializerError

2013-12-27 Thread redocpot
Hi,

When playing with spark, I encountered an ExceptionInInitializerError. I am
confused with the closure which spark uses.

For example, I have 2 scala files in my project:

*// Entry.scala
object Entry {
  def main(args: Array[String]) = {
anonymous.run()
  }
}

// anonymous.scala
object anonymous {
val rdd1 = sparkContext.parallelize(List(1, 2, 3, 4))
val a = 1

def run() {
val rdd2 = rdd1.map(_ + a)
println(rdd2.count)
}
}*

The code above will give an ExceptionInInitializerError which says:
*java.lang.ExceptionInInitializerError
at job.analysis.anonymous$.init(anonymous.scala:37)
at job.analysis.anonymous$.clinit(anonymous.scala)
...
java.lang.NoClassDefFoundError: Could not initialize class 
job.analysis..anonymous$*

But if I move *val a = 1* into the *run() * function, it works. So I think
the problem is the object initialization.

The object anonymous here is only created in the main process, not on the
worker process, thus, when the workers want to access to val a which is
defined as an attribute of anonymous. They just can not find it the
initialized object. Correct me if I am wrong.

I have also tested more about it:

*// snippet 1
def run() = {
class t extends Serializable {
  val a = 2
}
val tt = new t()
val rdd2 = rdd1.map(_ + tt.a)
println(rdd2.count)
} // this one works

// snippet 2 
def run() = {
object t extends Serializable {
  val a = 2
}
val rdd2 = rdd1.map(_ + t.a)
println(rdd2.count)
} // I encountered the same ExceptionInInitializerError*

It seems strange to me, because, in snippet 1, the instance tt is created in
the main process the same as snippet 2, except, 
in snippet 2, t is a standalone object.

Could someone explain why it occurs ? Maybe it's just a basic question, but
it really makes me confused a lot. Am I missing anything here ?

Hao



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/closure-and-ExceptionInInitializerError-tp77.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.