Hi Folks,
Does anybody know what is the reason not allowing preserverPartitioning in
RDD.map? Do I miss something here?
Following example involves two shuffles. I think if preservePartitioning is
allowed, we can avoid the second one, right?
val r1 =
I believe if you do the following:
sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)).map((_,1)).reduceByKey(_+_).mapValues(_+1).reduceByKey(_+_).toDebugString
(8) MapPartitionsRDD[34] at reduceByKey at console:23 []
| MapPartitionsRDD[33] at mapValues at console:23 []
| ShuffledRDD[32] at
Thanks Jonathan. You are right regarding rewrite the example.
I mean providing such option to developer so that it is controllable. The
example may seems silly, and I don’t know the use cases.
But for example, if I also want to operate both the key and value part to
generate some new value
def mapValues[U](f: V = U): RDD[(K, U)] = {
val cleanF = self.context.clean(f)
new MapPartitionsRDD[(K, U), (K, V)](self,
(context, pid, iter) = iter.map { case (k, v) = (k, cleanF(v)) },
preservesPartitioning = true)
}
What you want:
def mapValues[U](f: (K, V) = U): RDD[(K
)) },
preservesPartitioning = true)
}
What you want:
def mapValues[U](f: (K, V) = U): RDD[(K, U)] = {
val cleanF = self.context.clean(f)
new MapPartitionsRDD[(K, U), (K, V)](self,
(context, pid, iter) = iter.map { case t@(k, _) = (k, cleanF(t
= self.context.clean(f)
new MapPartitionsRDD[(K, U), (K, V)](self,
(context, pid, iter) = iter.map { case (k, v) = (k, cleanF(v)) },
preservesPartitioning = true)
}
What you want:
def mapValues[U](f: (K, V) = U): RDD[(K, U)] = {
val cleanF = self.context.clean(f)
new
Hi All,
The function *mapPartitions *in RDD.scala
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala
takes
a boolean parameter *preservesPartitioning. *It seems if that parameter is
passed as *false*, the passed function f will operate on the data only
Hi Kamal,
This is not what preservesPartitioning does -- actually what it means is that
if the RDD has a Partitioner set (which means it's an RDD of key-value pairs
and the keys are grouped into a known way, e.g. hashed or range-partitioned),
your map function is not changing the partition