---------- Forwarded message ----------
From: Hao Ren <inv...@gmail.com>
Date: Thu, Jun 25, 2015 at 7:03 PM
Subject: Re: map vs mapPartitions
To: Shushant Arora <shushantaror...@gmail.com>


In fact, map and mapPartitions produce RDD of the same type:
MapPartitionsRDD.

Check RDD api source code below:

def map[U: ClassTag](f: T => U): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}

def mapPartitions[U: ClassTag](
    f: Iterator[T] => Iterator[U],
    preservesPartitioning: Boolean = false): RDD[U] = withScope {
  val cleanedF = sc.clean(f)
  new MapPartitionsRDD(
    this,
    (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
    preservesPartitioning)
}

So, even map uses iterator !

For map, `iter.map(cleanF)` means when action is called, the passed
function must be applied to all records in each partition.
For mapPartitions, your function is applied on an iterator. No guarantee on
that all records will be loaded in memory. For example,
If the function just takes the first record, for example:
rdd.mapPartitions(iter => Iterator.single(iter.next)), the iterator is not
traversed.
It really depends on your function. It gives you the control on partition
level. Just that.

The two APIs are for different purposes. The choice depends on your need.

In the given example, your mapPartitions is doing the same thing as map =>
rdd.map(_.length). The performance is the same.


On Thu, Jun 25, 2015 at 5:36 PM, Shushant Arora <shushantaror...@gmail.com>
wrote:

> yes, 1 partition per core and  mapPartitions apply function on each
> partition.
>
> Question is Does complete partition loads in memory so that function can
> be applied to it or its an iterator and iterator.next() loads next record
> and if yes then how is it efficient than map which also works on 1 record
> at a time.
>
>
> Is the only difference is -- only while loop as in below runs per record
> as in map . But code above that will be run once per partition.
>
>
> public Iterable<Integer> call(Iterator<String> input)
> throws Exception {
> List<Integer> output = new ArrayList<Integer>();
> while(input.hasNext()){
> output.add(input.next().length());
>  }
>
>
> so if I don't have any heavy code above while loop, performance will be
> same as of map function.
>
>
>
> On Thu, Jun 25, 2015 at 6:51 PM, Hao Ren <inv...@gmail.com> wrote:
>
>> It's not the number of executors that matters, but the # of the CPU cores
>> of your cluster.
>>
>> Each partition will be loaded on a core for computing.
>>
>> e.g. A cluster of 3 nodes has 24 cores, and you divide the RDD in 24
>> partitions (24 tasks for narrow dependency).
>> Then all the 24 partitions will be loaded to your cluster in parallel,
>> one on each core.
>> You may notice that some tasks will finish more quickly than others. So
>> divide the RDD into (2~3) x (# of cores) for better pipeline performance.
>> Say we have 72 partitions in your RDD, then initially 24 tasks run on 24
>> cores, then first done first served until all 72 tasks are processed.
>>
>> Back to your origin question, map and mapPartitions are both
>> transformation, but on different granularity.
>> map => apply the function on each record in each partition.
>> mapPartitions => apply the function on each partition.
>> But the rule is the same, one partition per core.
>>
>> Hope it helps.
>> Hao
>>
>>
>>
>>
>> On Thu, Jun 25, 2015 at 1:28 PM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> say source is HDFS,And file is divided in 10 partitions. so what will be
>>>  input contains.
>>>
>>> public Iterable<Integer> call(Iterator<String> input)
>>>
>>> say I have 10 executors in job each having single partition.
>>>
>>> will it have some part of partition or complete. And if some when I call
>>> input.next() - it will fetch rest or how is it handled ?
>>>
>>>
>>>
>>>
>>>
>>> On Thu, Jun 25, 2015 at 3:11 PM, Sean Owen <so...@cloudera.com> wrote:
>>>
>>>> No, or at least, it depends on how the source of the partitions was
>>>> implemented.
>>>>
>>>> On Thu, Jun 25, 2015 at 12:16 PM, Shushant Arora
>>>> <shushantaror...@gmail.com> wrote:
>>>> > Does mapPartitions keep complete partitions in memory of executor as
>>>> > iterable.
>>>> >
>>>> > JavaRDD<String> rdd = jsc.textFile("path");
>>>> > JavaRDD<Integer> output = rdd.mapPartitions(new
>>>> > FlatMapFunction<Iterator<String>, Integer>() {
>>>> >
>>>> > public Iterable<Integer> call(Iterator<String> input)
>>>> > throws Exception {
>>>> > List<Integer> output = new ArrayList<Integer>();
>>>> > while(input.hasNext()){
>>>> > output.add(input.next().length());
>>>> > }
>>>> > return output;
>>>> > }
>>>> >
>>>> > });
>>>> >
>>>> >
>>>> > Here does input is present in memory and can contain complete
>>>> partition of
>>>> > gbs ?
>>>> > Will this function call(Iterator<String> input) is called only for no
>>>> of
>>>> > partitions(say if I have 10 in this example) times. Not no of lines
>>>> > times(say 10000000) .
>>>> >
>>>> >
>>>> > And whats the use of mapPartitionsWithIndex ?
>>>> >
>>>> > Thanks
>>>> >
>>>>
>>>
>>>
>>
>>
>> --
>> Hao Ren
>>
>> Data Engineer @ leboncoin
>>
>> Paris, France
>>
>
>


-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France



-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France

Reply via email to