Hi,
    Everyone.  I have a piece of following code. When I run it,
it occurred the error just like below, it seem that the SparkContext is not
serializable, but i do not try to use the SparkContext except the broadcast.
    [In fact, this code is in the MLLib, I just try to broadcast the
 centerArrays ]

    it can success in the redeceBykey operation, but failed at the collect
operation, this confused me.


INFO DAGScheduler: Failed to run collect at KMeans.scala:235
[error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task not
serializable: java.io.NotSerializableException:
org.apache.spark.SparkContext
org.apache.spark.SparkException: Job aborted: Task not serializable:
java.io.NotSerializableException: org.apache.spark.SparkContext
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
 at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)




private def initKMeansParallel(data: RDD[Array[Double]]):
Array[ClusterCenters] = {

    @transient val sc = data.sparkContext           // I try to add
the transient
annotation here, but it doesn't work

    // Initialize each run's center to a random point
    val seed = new XORShiftRandom().nextInt()
    val sample = data.takeSample(true, runs, seed).toSeq
    val centers = Array.tabulate(runs)(r => ArrayBuffer(sample(r)))

    // On each step, sample 2 * k points on average for each run with
probability proportional
    // to their squared distance from that run's current centers
    for (step <- 0 until initializationSteps) {
      val centerArrays = sc.broadcast(centers.map(_.toArray))
      val sumCosts = data.flatMap { point =>
        for (r <- 0 until runs) yield (r,
KMeans.pointCost(centerArrays.value(r), point))
      }.reduceByKey(_ + _).collectAsMap()
                                    //can pass at this point
      val chosen = data.mapPartitionsWithIndex { (index, points) =>
        val rand = new XORShiftRandom(seed ^ (step << 16) ^ index)
        for {
          p <- points
          r <- 0 until runs
          if rand.nextDouble() < KMeans.pointCost(centerArrays.value(r), p)
* 2 * k / sumCosts(r)
        } yield (r, p)
      }.collect()
                                                    // failed at this
point.
      for ((r, p) <- chosen) {
        centers(r) += p
      }
    }

Reply via email to