Which code do you used, do you caused by your own code or something in spark itself?
On Tue, Jul 22, 2014 at 8:50 AM, hsy...@gmail.com <hsy...@gmail.com> wrote: > I have the same problem > > > On Sat, Jul 19, 2014 at 12:31 AM, lihu <lihu...@gmail.com> wrote: > >> Hi, >> Everyone. I have a piece of following code. When I run it, >> it occurred the error just like below, it seem that the SparkContext is not >> serializable, but i do not try to use the SparkContext except the broadcast. >> [In fact, this code is in the MLLib, I just try to broadcast the >> centerArrays ] >> >> it can success in the redeceBykey operation, but failed at the >> collect operation, this confused me. >> >> >> INFO DAGScheduler: Failed to run collect at KMeans.scala:235 >> [error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task >> not serializable: java.io.NotSerializableException: >> org.apache.spark.SparkContext >> org.apache.spark.SparkException: Job aborted: Task not serializable: >> java.io.NotSerializableException: org.apache.spark.SparkContext >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) >> >> >> >> >> private def initKMeansParallel(data: RDD[Array[Double]]): >> Array[ClusterCenters] = { >> >> @transient val sc = data.sparkContext // I try to add the >> transient >> annotation here, but it doesn't work >> >> // Initialize each run's center to a random point >> val seed = new XORShiftRandom().nextInt() >> val sample = data.takeSample(true, runs, seed).toSeq >> val centers = Array.tabulate(runs)(r => ArrayBuffer(sample(r))) >> >> // On each step, sample 2 * k points on average for each run with >> probability proportional >> // to their squared distance from that run's current centers >> for (step <- 0 until initializationSteps) { >> val centerArrays = sc.broadcast(centers.map(_.toArray)) >> val sumCosts = data.flatMap { point => >> for (r <- 0 until runs) yield (r, >> KMeans.pointCost(centerArrays.value(r), point)) >> }.reduceByKey(_ + _).collectAsMap() >> //can pass at this point >> val chosen = data.mapPartitionsWithIndex { (index, points) => >> val rand = new XORShiftRandom(seed ^ (step << 16) ^ index) >> for { >> p <- points >> r <- 0 until runs >> if rand.nextDouble() < KMeans.pointCost(centerArrays.value(r), >> p) * 2 * k / sumCosts(r) >> } yield (r, p) >> }.collect() >> // failed at this >> point. >> for ((r, p) <- chosen) { >> centers(r) += p >> } >> } >> >> >> >> >> > -- *Best Wishes!* *Li Hu(李浒) | Graduate Student* *Institute for Interdisciplinary Information Sciences(IIIS <http://iiis.tsinghua.edu.cn/>) * *Tsinghua University, China* *Email: lihu...@gmail.com <lihu...@gmail.com>* *Tel : +86 15120081920* *Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/ <http://iiis.tsinghua.edu.cn/zh/lihu/>*