​Which code do you used, do you caused by your own code or something in
spark itself?


On Tue, Jul 22, 2014 at 8:50 AM, hsy...@gmail.com <hsy...@gmail.com> wrote:

> I have the same problem
>
>
> On Sat, Jul 19, 2014 at 12:31 AM, lihu <lihu...@gmail.com> wrote:
>
>> Hi,
>>     Everyone.  I have a piece of following code. When I run it,
>> it occurred the error just like below, it seem that the SparkContext is not
>> serializable, but i do not try to use the SparkContext except the broadcast.
>>     [In fact, this code is in the MLLib, I just try to broadcast the
>>  centerArrays ]
>>
>>     it can success in the redeceBykey operation, but failed at the
>> collect operation, this confused me.
>>
>>
>> INFO DAGScheduler: Failed to run collect at KMeans.scala:235
>> [error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task
>> not serializable: java.io.NotSerializableException:
>> org.apache.spark.SparkContext
>> org.apache.spark.SparkException: Job aborted: Task not serializable:
>> java.io.NotSerializableException: org.apache.spark.SparkContext
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>>  at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>>
>>
>>
>>
>> private def initKMeansParallel(data: RDD[Array[Double]]):
>> Array[ClusterCenters] = {
>>
>>     @transient val sc = data.sparkContext           // I try to add the 
>> transient
>> annotation here, but it doesn't work
>>
>>     // Initialize each run's center to a random point
>>     val seed = new XORShiftRandom().nextInt()
>>     val sample = data.takeSample(true, runs, seed).toSeq
>>     val centers = Array.tabulate(runs)(r => ArrayBuffer(sample(r)))
>>
>>     // On each step, sample 2 * k points on average for each run with
>> probability proportional
>>     // to their squared distance from that run's current centers
>>     for (step <- 0 until initializationSteps) {
>>       val centerArrays = sc.broadcast(centers.map(_.toArray))
>>       val sumCosts = data.flatMap { point =>
>>         for (r <- 0 until runs) yield (r,
>> KMeans.pointCost(centerArrays.value(r), point))
>>       }.reduceByKey(_ + _).collectAsMap()
>>                                       //can pass at this point
>>       val chosen = data.mapPartitionsWithIndex { (index, points) =>
>>         val rand = new XORShiftRandom(seed ^ (step << 16) ^ index)
>>         for {
>>           p <- points
>>           r <- 0 until runs
>>           if rand.nextDouble() < KMeans.pointCost(centerArrays.value(r),
>> p) * 2 * k / sumCosts(r)
>>         } yield (r, p)
>>       }.collect()
>>                                                       // failed at this
>> point.
>>       for ((r, p) <- chosen) {
>>         centers(r) += p
>>       }
>>     }
>>
>>
>>
>>
>>
>


-- 
*Best Wishes!*

 *Li Hu(李浒) | Graduate Student*

*Institute for Interdisciplinary Information Sciences(IIIS
<http://iiis.tsinghua.edu.cn/>) *
*Tsinghua University, China*

*Email: lihu...@gmail.com <lihu...@gmail.com>*
*Tel  : +86 15120081920*
*Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/
<http://iiis.tsinghua.edu.cn/zh/lihu/>*

Reply via email to