Re: Stateful RDD

2013-12-30 Thread Bao
yeah, it makes sense. Thanks guys.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Stateful-RDD-tp71p125.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: closure and ExceptionInInitializerError

2013-12-29 Thread Bao
Redocpot, I tried your 2 snippets with spark-shell and both work fine. I only
see problem if closure is not serializeable.

scala> val rdd1 = sc.parallelize(List(1, 2, 3, 4)) 
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at
parallelize at :12

scala> val a = 1   
a: Int = 1

scala> val rdd2 = rdd1.map(_ + a) 
rdd2: org.apache.spark.rdd.RDD[Int] = MappedRDD[5] at map at :16

scala> rdd2.count
13/12/30 03:50:59 INFO SparkContext: Starting job: count at :19
13/12/30 03:50:59 INFO DAGScheduler: Got job 4 (count at :19) with
2 output partitions (allowLocal=false)
13/12/30 03:50:59 INFO DAGScheduler: Final stage: Stage 4 (count at
:19)
13/12/30 03:50:59 INFO DAGScheduler: Parents of final stage: List()
13/12/30 03:50:59 INFO DAGScheduler: Missing parents: List()
13/12/30 03:50:59 INFO DAGScheduler: Submitting Stage 4 (MappedRDD[5] at map
at :16), which has no missing parents
13/12/30 03:50:59 INFO DAGScheduler: Submitting 2 missing tasks from Stage 4
(MappedRDD[5] at map at :16)
13/12/30 03:50:59 INFO ClusterScheduler: Adding task set 4.0 with 2 tasks
13/12/30 03:50:59 INFO ClusterTaskSetManager: Starting task 4.0:0 as TID 8
on executor 0: worker1 (PROCESS_LOCAL)
13/12/30 03:50:59 INFO ClusterTaskSetManager: Serialized task 4.0:0 as 1839
bytes in 1 ms
13/12/30 03:50:59 INFO ClusterTaskSetManager: Starting task 4.0:1 as TID 9
on executor 1: worker2 (PROCESS_LOCAL)
13/12/30 03:50:59 INFO ClusterTaskSetManager: Serialized task 4.0:1 as 1839
bytes in 1 ms
13/12/30 03:51:00 INFO ClusterTaskSetManager: Finished TID 8 in 152 ms on
worker1 (progress: 1/2)
13/12/30 03:51:00 INFO DAGScheduler: Completed ResultTask(4, 0)
13/12/30 03:51:00 INFO ClusterTaskSetManager: Finished TID 9 in 171 ms on
worker2 (progress: 2/2)
13/12/30 03:51:00 INFO ClusterScheduler: Remove TaskSet 4.0 from pool 
13/12/30 03:51:00 INFO DAGScheduler: Completed ResultTask(4, 1)
13/12/30 03:51:00 INFO DAGScheduler: Stage 4 (count at :19)
finished in 0.131 s
13/12/30 03:51:00 INFO SparkContext: Job finished: count at :19,
took 0.212351498 s
res5: Long = 4




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/closure-and-ExceptionInInitializerError-tp77p98.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Stateful RDD

2013-12-29 Thread Bao
Thanks guys, that's interesting. Though it looks like singleton object is
defined at driver, spark actually will serialize closure and send to
workers. The interesting thing is that ScriptEngine is NOT serializable, but
till it hasn't been initialized spark can serialize the closure well. But if
I force it initialize first then spark throws NotSerializeableException. 

Anyway, following Christopher's suggestion to avoid reference to outside
closure is better.

TD, do you mean that Executors share the same SerializerInstance and there
is a case that more than 1 thread call the same closure instance?

-Bao.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Stateful-RDD-tp71p97.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Stateful RDD

2013-12-27 Thread Bao
It looks like I need to use DStream instead...



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Stateful-RDD-tp71p85.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.