I have the same problem
On Sat, Jul 19, 2014 at 12:31 AM, lihu <lihu...@gmail.com> wrote: > Hi, > Everyone. I have a piece of following code. When I run it, > it occurred the error just like below, it seem that the SparkContext is not > serializable, but i do not try to use the SparkContext except the broadcast. > [In fact, this code is in the MLLib, I just try to broadcast the > centerArrays ] > > it can success in the redeceBykey operation, but failed at the collect > operation, this confused me. > > > INFO DAGScheduler: Failed to run collect at KMeans.scala:235 > [error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task > not serializable: java.io.NotSerializableException: > org.apache.spark.SparkContext > org.apache.spark.SparkException: Job aborted: Task not serializable: > java.io.NotSerializableException: org.apache.spark.SparkContext > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) > > > > > private def initKMeansParallel(data: RDD[Array[Double]]): > Array[ClusterCenters] = { > > @transient val sc = data.sparkContext // I try to add the > transient > annotation here, but it doesn't work > > // Initialize each run's center to a random point > val seed = new XORShiftRandom().nextInt() > val sample = data.takeSample(true, runs, seed).toSeq > val centers = Array.tabulate(runs)(r => ArrayBuffer(sample(r))) > > // On each step, sample 2 * k points on average for each run with > probability proportional > // to their squared distance from that run's current centers > for (step <- 0 until initializationSteps) { > val centerArrays = sc.broadcast(centers.map(_.toArray)) > val sumCosts = data.flatMap { point => > for (r <- 0 until runs) yield (r, > KMeans.pointCost(centerArrays.value(r), point)) > }.reduceByKey(_ + _).collectAsMap() > //can pass at this point > val chosen = data.mapPartitionsWithIndex { (index, points) => > val rand = new XORShiftRandom(seed ^ (step << 16) ^ index) > for { > p <- points > r <- 0 until runs > if rand.nextDouble() < KMeans.pointCost(centerArrays.value(r), > p) * 2 * k / sumCosts(r) > } yield (r, p) > }.collect() > // failed at this > point. > for ((r, p) <- chosen) { > centers(r) += p > } > } > > > > >