Serializing the main object isn't going to help here - it's SparkContext
it's complaining about.

The problem is that the context is, according to the code you sent,
computeDwt has a signature of:
class DWTSample ... {
    def computDWT (sc: SparkContext, data: ArrayBuffer[(Int, Double)]):
List[Double]
}

do you need the SparkContext within that function?  That function is
executing out on your workers; they shouldn't be trying to send work
directly to other workers anyway, or using RDDs or other spark contexts,
they should just be working with the data.  If you can eliminate the
SparkContext parameter there, you should be fine.

Also, I don't know how expensive DWTSample is to produce, or if you need a
separate instance of each record; if you need one for each record, as is
indicated by the code you sent, it doesn't actually have to be serializable
- you're creating it out on the worker nodes, not sending it to them from
the client node.
If you don't need a unique instance per record, then you can either use the
serializable nature to just create one, and use that one for each record,
or if you would prefer it not to be serializable, you can create one per
partition and use that one on each record in the partition:
kk = series.mapPartitions(iter => {
  val sampler = new DWTsample()
  iter.map(i => sampler.computeDwt(i._2))
})

(assuming you eliminated the sc parameter, of course)


Hope this helps!


On Mon, May 12, 2014 at 2:27 AM, yh18190 <yh18...@gmail.com> wrote:
>
>> Hi,
>>
>> I am facing above exception when I am trying to apply a method(ComputeDwt)
>> on RDD[(Int,ArrayBuffer[(Int,Double)])] input.
>> I am even using extends Serialization option to serialize objects in
>> spark.Here is the code snippet.
>>
>> Could anyone suggest me what could be the problem and what should be done
>> to
>> overcome this issue.???
>>
>> input:series:RDD[(Int,ArrayBuffer[(Int,Double)])]
>> DWTsample extends Serialization is a class having computeDwt function.
>> sc: sparkContext
>>
>>  val  kk:RDD[(Int,List[Double])]=series.map(t=>(t._1,new
>> DWTsample().computeDwt(sc,t._2)))
>>
>> Error:
>> org.apache.spark.SparkException: Job failed:
>> java.io.NotSerializableException: org.apache.spark.SparkContext
>> org.apache.spark.SparkException: Job failed:
>> java.io.NotSerializableException: org.apache.spark.SparkContext
>>         at
>>
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>>         at
>>
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>>         at
>>
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>>         at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>         at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>>         at
>> org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>>         at
>> org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>>         at
>>
>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>>         at
>> org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>>         at
>>
>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Job-failed-java-io-NotSerializableException-org-apache-spark-SparkContext-tp5585.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>
>
>
> --
> Software Engineer
> Analytics Engineering Team@ Box
> Mountain View, CA
>



-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com

Reply via email to