RDD.map does not allowed to preservesPartitioning?

2015-03-26 Thread Zhan Zhang
Hi Folks, Does anybody know what is the reason not allowing preserverPartitioning in RDD.map? Do I miss something here? Following example involves two shuffles. I think if preservePartitioning is allowed, we can avoid the second one, right? val r1 =

Re: RDD.map does not allowed to preservesPartitioning?

2015-03-26 Thread Jonathan Coveney
I believe if you do the following: sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)).map((_,1)).reduceByKey(_+_).mapValues(_+1).reduceByKey(_+_).toDebugString (8) MapPartitionsRDD[34] at reduceByKey at console:23 [] | MapPartitionsRDD[33] at mapValues at console:23 [] | ShuffledRDD[32] at

Re: RDD.map does not allowed to preservesPartitioning?

2015-03-26 Thread Zhan Zhang
Thanks Jonathan. You are right regarding rewrite the example. I mean providing such option to developer so that it is controllable. The example may seems silly, and I don’t know the use cases. But for example, if I also want to operate both the key and value part to generate some new value

Re: RDD.map does not allowed to preservesPartitioning?

2015-03-26 Thread Jonathan Coveney
def mapValues[U](f: V = U): RDD[(K, U)] = { val cleanF = self.context.clean(f) new MapPartitionsRDD[(K, U), (K, V)](self, (context, pid, iter) = iter.map { case (k, v) = (k, cleanF(v)) }, preservesPartitioning = true) } What you want: def mapValues[U](f: (K, V) = U): RDD[(K

Re: RDD.map does not allowed to preservesPartitioning?

2015-03-26 Thread Patrick Wendell
)) }, preservesPartitioning = true) } What you want: def mapValues[U](f: (K, V) = U): RDD[(K, U)] = { val cleanF = self.context.clean(f) new MapPartitionsRDD[(K, U), (K, V)](self, (context, pid, iter) = iter.map { case t@(k, _) = (k, cleanF(t

Re: RDD.map does not allowed to preservesPartitioning?

2015-03-26 Thread Zhan Zhang
= self.context.clean(f) new MapPartitionsRDD[(K, U), (K, V)](self, (context, pid, iter) = iter.map { case (k, v) = (k, cleanF(v)) }, preservesPartitioning = true) } What you want: def mapValues[U](f: (K, V) = U): RDD[(K, U)] = { val cleanF = self.context.clean(f) new

preservesPartitioning

2014-07-17 Thread Kamal Banga
Hi All, The function *mapPartitions *in RDD.scala https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala takes a boolean parameter *preservesPartitioning. *It seems if that parameter is passed as *false*, the passed function f will operate on the data only

Re: preservesPartitioning

2014-07-17 Thread Matei Zaharia
Hi Kamal, This is not what preservesPartitioning does -- actually what it means is that if the RDD has a Partitioner set (which means it's an RDD of key-value pairs and the keys are grouped into a known way, e.g. hashed or range-partitioned), your map function is not changing the partition