Hi there, I'm trying to do below action while it always return java.io.NotSerializableException in the shuffle task. I've checked that Array is serializable. how can I get the data of rdd in newRDD?
step 1: val rdd: RDD[(AnyRef, Array[AnyRef]] {......} step2 : rdd .partitionBy(partitioner) .map(_._2) step3: pass rdd to newRDD as prev: newRDD[K, V] ( xxx, xxx, xxx, prev: RDD[Array[AnyRef]] extends RDD[(K, V)] (prev) { override protected def getPartitions() {...} override def compute(split: Partition, context: TaskContext): Iterator[(K, V)] {... val rows = firstParent[Array[AnyRef]].iterator(split, context) } } Thanks, LL