Thanks all for the quick response.

Thanks.

Zhan Zhang

On Mar 26, 2015, at 3:14 PM, Patrick Wendell <pwend...@gmail.com> wrote:

> I think we have a version of mapPartitions that allows you to tell
> Spark the partitioning is preserved:
> 
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L639
> 
> We could also add a map function that does same. Or you can just write
> your map using an iterator.
> 
> - Patrick
> 
> On Thu, Mar 26, 2015 at 3:07 PM, Jonathan Coveney <jcove...@gmail.com> wrote:
>> This is just a deficiency of the api, imo. I agree: mapValues could
>> definitely be a function (K, V)=>V1. The option isn't set by the function,
>> it's on the RDD. So you could look at the code and do this.
>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala
>> 
>> def mapValues[U](f: V => U): RDD[(K, U)] = {
>>    val cleanF = self.context.clean(f)
>>    new MapPartitionsRDD[(K, U), (K, V)](self,
>>      (context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
>>      preservesPartitioning = true)
>>  }
>> 
>> What you want:
>> 
>> def mapValues[U](f: (K, V) => U): RDD[(K, U)] = {
>>    val cleanF = self.context.clean(f)
>>    new MapPartitionsRDD[(K, U), (K, V)](self,
>>      (context, pid, iter) => iter.map { case t@(k, _) => (k, cleanF(t)) },
>>      preservesPartitioning = true)
>>  }
>> 
>> One of the nice things about spark is that making such new operators is very
>> easy :)
>> 
>> 2015-03-26 17:54 GMT-04:00 Zhan Zhang <zzh...@hortonworks.com>:
>> 
>>> Thanks Jonathan. You are right regarding rewrite the example.
>>> 
>>> I mean providing such option to developer so that it is controllable. The
>>> example may seems silly, and I don't know the use cases.
>>> 
>>> But for example, if I also want to operate both the key and value part to
>>> generate some new value with keeping key part untouched. Then mapValues may
>>> not be able to  do this.
>>> 
>>> Changing the code to allow this is trivial, but I don't know whether there
>>> is some special reason behind this.
>>> 
>>> Thanks.
>>> 
>>> Zhan Zhang
>>> 
>>> 
>>> 
>>> 
>>> On Mar 26, 2015, at 2:49 PM, Jonathan Coveney <jcove...@gmail.com> wrote:
>>> 
>>> I believe if you do the following:
>>> 
>>> 
>>> sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)).map((_,1)).reduceByKey(_+_).mapValues(_+1).reduceByKey(_+_).toDebugString
>>> 
>>> (8) MapPartitionsRDD[34] at reduceByKey at <console>:23 []
>>> |  MapPartitionsRDD[33] at mapValues at <console>:23 []
>>> |  ShuffledRDD[32] at reduceByKey at <console>:23 []
>>> +-(8) MapPartitionsRDD[31] at map at <console>:23 []
>>>    |  ParallelCollectionRDD[30] at parallelize at <console>:23 []
>>> 
>>> The difference is that spark has no way to know that your map closure
>>> doesn't change the key. if you only use mapValues, it does. Pretty cool that
>>> they optimized that :)
>>> 
>>> 2015-03-26 17:44 GMT-04:00 Zhan Zhang <zzh...@hortonworks.com>:
>>>> 
>>>> Hi Folks,
>>>> 
>>>> Does anybody know what is the reason not allowing preserverPartitioning
>>>> in RDD.map? Do I miss something here?
>>>> 
>>>> Following example involves two shuffles. I think if preservePartitioning
>>>> is allowed, we can avoid the second one, right?
>>>> 
>>>> val r1 = sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4))
>>>> val r2 = r1.map((_, 1))
>>>> val r3 = r2.reduceByKey(_+_)
>>>> val r4 = r3.map(x=>(x._1, x._2 + 1))
>>>> val r5 = r4.reduceByKey(_+_)
>>>> r5.collect.foreach(println)
>>>> 
>>>> scala> r5.toDebugString
>>>> res2: String =
>>>> (8) ShuffledRDD[4] at reduceByKey at <console>:29 []
>>>> +-(8) MapPartitionsRDD[3] at map at <console>:27 []
>>>>    |  ShuffledRDD[2] at reduceByKey at <console>:25 []
>>>>    +-(8) MapPartitionsRDD[1] at map at <console>:23 []
>>>>       |  ParallelCollectionRDD[0] at parallelize at <console>:21 []
>>>> 
>>>> Thanks.
>>>> 
>>>> Zhan Zhang
>>>> 
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>> 
>>> 
>>> 
>> 


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to