Hi there,
I'm trying to do below action while it always return
java.io.NotSerializableException in the shuffle task.
I've checked that Array is serializable. how can I get the data of rdd in
newRDD?

step 1: val rdd: RDD[(AnyRef, Array[AnyRef]] {......}

step2 :       rdd
                 .partitionBy(partitioner)
                 .map(_._2)

step3:  pass rdd to newRDD as prev:
newRDD[K, V] (
xxx,
xxx,
xxx,
prev: RDD[Array[AnyRef]] extends RDD[(K, V)] (prev) {

override protected def getPartitions() {...}

override def compute(split: Partition, context: TaskContext): Iterator[(K,
V)] {...
      val rows = firstParent[Array[AnyRef]].iterator(split, context)

   }

}


Thanks,
LL

Reply via email to