Also,

I've noticed that .map() actually creates a MapPartitionsRDD under the
hood. SO I think the real difference is just in the API that's being
exposed. You can do a map() and not have to think about the partitions at
all or you can do a .mapPartitions() and be able to do things like chunking
of the data in the partition (fetching more than 1 record @ a time).

On Thu, Jun 25, 2015 at 12:19 PM, Corey Nolet <cjno...@gmail.com> wrote:

> I don't know exactly what's going on under the hood but I would not assume
> that just because a whole partition is not being pulled into memory @ one
> time that that means each record is being pulled at 1 time. That's the
> beauty of exposing Iterators & Iterables in an API rather than collections-
> there's a bunch of buffering that can be hidden from the user to make the
> iterations as efficient as they can be.
>
> On Thu, Jun 25, 2015 at 11:36 AM, Shushant Arora <
> shushantaror...@gmail.com> wrote:
>
>> yes, 1 partition per core and  mapPartitions apply function on each
>> partition.
>>
>> Question is Does complete partition loads in memory so that function can
>> be applied to it or its an iterator and iterator.next() loads next record
>> and if yes then how is it efficient than map which also works on 1 record
>> at a time.
>>
>>
>> Is the only difference is -- only while loop as in below runs per record
>> as in map . But code above that will be run once per partition.
>>
>>
>> public Iterable<Integer> call(Iterator<String> input)
>> throws Exception {
>> List<Integer> output = new ArrayList<Integer>();
>> while(input.hasNext()){
>> output.add(input.next().length());
>>  }
>>
>>
>> so if I don't have any heavy code above while loop, performance will be
>> same as of map function.
>>
>>
>>
>> On Thu, Jun 25, 2015 at 6:51 PM, Hao Ren <inv...@gmail.com> wrote:
>>
>>> It's not the number of executors that matters, but the # of the CPU
>>> cores of your cluster.
>>>
>>> Each partition will be loaded on a core for computing.
>>>
>>> e.g. A cluster of 3 nodes has 24 cores, and you divide the RDD in 24
>>> partitions (24 tasks for narrow dependency).
>>> Then all the 24 partitions will be loaded to your cluster in parallel,
>>> one on each core.
>>> You may notice that some tasks will finish more quickly than others. So
>>> divide the RDD into (2~3) x (# of cores) for better pipeline performance.
>>> Say we have 72 partitions in your RDD, then initially 24 tasks run on 24
>>> cores, then first done first served until all 72 tasks are processed.
>>>
>>> Back to your origin question, map and mapPartitions are both
>>> transformation, but on different granularity.
>>> map => apply the function on each record in each partition.
>>> mapPartitions => apply the function on each partition.
>>> But the rule is the same, one partition per core.
>>>
>>> Hope it helps.
>>> Hao
>>>
>>>
>>>
>>>
>>> On Thu, Jun 25, 2015 at 1:28 PM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
>>>> say source is HDFS,And file is divided in 10 partitions. so what will
>>>> be  input contains.
>>>>
>>>> public Iterable<Integer> call(Iterator<String> input)
>>>>
>>>> say I have 10 executors in job each having single partition.
>>>>
>>>> will it have some part of partition or complete. And if some when I
>>>> call input.next() - it will fetch rest or how is it handled ?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, Jun 25, 2015 at 3:11 PM, Sean Owen <so...@cloudera.com> wrote:
>>>>
>>>>> No, or at least, it depends on how the source of the partitions was
>>>>> implemented.
>>>>>
>>>>> On Thu, Jun 25, 2015 at 12:16 PM, Shushant Arora
>>>>> <shushantaror...@gmail.com> wrote:
>>>>> > Does mapPartitions keep complete partitions in memory of executor as
>>>>> > iterable.
>>>>> >
>>>>> > JavaRDD<String> rdd = jsc.textFile("path");
>>>>> > JavaRDD<Integer> output = rdd.mapPartitions(new
>>>>> > FlatMapFunction<Iterator<String>, Integer>() {
>>>>> >
>>>>> > public Iterable<Integer> call(Iterator<String> input)
>>>>> > throws Exception {
>>>>> > List<Integer> output = new ArrayList<Integer>();
>>>>> > while(input.hasNext()){
>>>>> > output.add(input.next().length());
>>>>> > }
>>>>> > return output;
>>>>> > }
>>>>> >
>>>>> > });
>>>>> >
>>>>> >
>>>>> > Here does input is present in memory and can contain complete
>>>>> partition of
>>>>> > gbs ?
>>>>> > Will this function call(Iterator<String> input) is called only for
>>>>> no of
>>>>> > partitions(say if I have 10 in this example) times. Not no of lines
>>>>> > times(say 10000000) .
>>>>> >
>>>>> >
>>>>> > And whats the use of mapPartitionsWithIndex ?
>>>>> >
>>>>> > Thanks
>>>>> >
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Hao Ren
>>>
>>> Data Engineer @ leboncoin
>>>
>>> Paris, France
>>>
>>
>>
>

Reply via email to