I have the same problem

On Sat, Jul 19, 2014 at 12:31 AM, lihu <lihu...@gmail.com> wrote:

> Hi,
>     Everyone.  I have a piece of following code. When I run it,
> it occurred the error just like below, it seem that the SparkContext is not
> serializable, but i do not try to use the SparkContext except the broadcast.
>     [In fact, this code is in the MLLib, I just try to broadcast the
>  centerArrays ]
>
>     it can success in the redeceBykey operation, but failed at the collect
> operation, this confused me.
>
>
> INFO DAGScheduler: Failed to run collect at KMeans.scala:235
> [error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task
> not serializable: java.io.NotSerializableException:
> org.apache.spark.SparkContext
> org.apache.spark.SparkException: Job aborted: Task not serializable:
> java.io.NotSerializableException: org.apache.spark.SparkContext
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>  at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>
>
>
>
> private def initKMeansParallel(data: RDD[Array[Double]]):
> Array[ClusterCenters] = {
>
>     @transient val sc = data.sparkContext           // I try to add the 
> transient
> annotation here, but it doesn't work
>
>     // Initialize each run's center to a random point
>     val seed = new XORShiftRandom().nextInt()
>     val sample = data.takeSample(true, runs, seed).toSeq
>     val centers = Array.tabulate(runs)(r => ArrayBuffer(sample(r)))
>
>     // On each step, sample 2 * k points on average for each run with
> probability proportional
>     // to their squared distance from that run's current centers
>     for (step <- 0 until initializationSteps) {
>       val centerArrays = sc.broadcast(centers.map(_.toArray))
>       val sumCosts = data.flatMap { point =>
>         for (r <- 0 until runs) yield (r,
> KMeans.pointCost(centerArrays.value(r), point))
>       }.reduceByKey(_ + _).collectAsMap()
>                                       //can pass at this point
>       val chosen = data.mapPartitionsWithIndex { (index, points) =>
>         val rand = new XORShiftRandom(seed ^ (step << 16) ^ index)
>         for {
>           p <- points
>           r <- 0 until runs
>           if rand.nextDouble() < KMeans.pointCost(centerArrays.value(r),
> p) * 2 * k / sumCosts(r)
>         } yield (r, p)
>       }.collect()
>                                                       // failed at this
> point.
>       for ((r, p) <- chosen) {
>         centers(r) += p
>       }
>     }
>
>
>
>
>

Reply via email to