Also, I've noticed that .map() actually creates a MapPartitionsRDD under the hood. SO I think the real difference is just in the API that's being exposed. You can do a map() and not have to think about the partitions at all or you can do a .mapPartitions() and be able to do things like chunking of the data in the partition (fetching more than 1 record @ a time).
On Thu, Jun 25, 2015 at 12:19 PM, Corey Nolet <cjno...@gmail.com> wrote: > I don't know exactly what's going on under the hood but I would not assume > that just because a whole partition is not being pulled into memory @ one > time that that means each record is being pulled at 1 time. That's the > beauty of exposing Iterators & Iterables in an API rather than collections- > there's a bunch of buffering that can be hidden from the user to make the > iterations as efficient as they can be. > > On Thu, Jun 25, 2015 at 11:36 AM, Shushant Arora < > shushantaror...@gmail.com> wrote: > >> yes, 1 partition per core and mapPartitions apply function on each >> partition. >> >> Question is Does complete partition loads in memory so that function can >> be applied to it or its an iterator and iterator.next() loads next record >> and if yes then how is it efficient than map which also works on 1 record >> at a time. >> >> >> Is the only difference is -- only while loop as in below runs per record >> as in map . But code above that will be run once per partition. >> >> >> public Iterable<Integer> call(Iterator<String> input) >> throws Exception { >> List<Integer> output = new ArrayList<Integer>(); >> while(input.hasNext()){ >> output.add(input.next().length()); >> } >> >> >> so if I don't have any heavy code above while loop, performance will be >> same as of map function. >> >> >> >> On Thu, Jun 25, 2015 at 6:51 PM, Hao Ren <inv...@gmail.com> wrote: >> >>> It's not the number of executors that matters, but the # of the CPU >>> cores of your cluster. >>> >>> Each partition will be loaded on a core for computing. >>> >>> e.g. A cluster of 3 nodes has 24 cores, and you divide the RDD in 24 >>> partitions (24 tasks for narrow dependency). >>> Then all the 24 partitions will be loaded to your cluster in parallel, >>> one on each core. >>> You may notice that some tasks will finish more quickly than others. So >>> divide the RDD into (2~3) x (# of cores) for better pipeline performance. >>> Say we have 72 partitions in your RDD, then initially 24 tasks run on 24 >>> cores, then first done first served until all 72 tasks are processed. >>> >>> Back to your origin question, map and mapPartitions are both >>> transformation, but on different granularity. >>> map => apply the function on each record in each partition. >>> mapPartitions => apply the function on each partition. >>> But the rule is the same, one partition per core. >>> >>> Hope it helps. >>> Hao >>> >>> >>> >>> >>> On Thu, Jun 25, 2015 at 1:28 PM, Shushant Arora < >>> shushantaror...@gmail.com> wrote: >>> >>>> say source is HDFS,And file is divided in 10 partitions. so what will >>>> be input contains. >>>> >>>> public Iterable<Integer> call(Iterator<String> input) >>>> >>>> say I have 10 executors in job each having single partition. >>>> >>>> will it have some part of partition or complete. And if some when I >>>> call input.next() - it will fetch rest or how is it handled ? >>>> >>>> >>>> >>>> >>>> >>>> On Thu, Jun 25, 2015 at 3:11 PM, Sean Owen <so...@cloudera.com> wrote: >>>> >>>>> No, or at least, it depends on how the source of the partitions was >>>>> implemented. >>>>> >>>>> On Thu, Jun 25, 2015 at 12:16 PM, Shushant Arora >>>>> <shushantaror...@gmail.com> wrote: >>>>> > Does mapPartitions keep complete partitions in memory of executor as >>>>> > iterable. >>>>> > >>>>> > JavaRDD<String> rdd = jsc.textFile("path"); >>>>> > JavaRDD<Integer> output = rdd.mapPartitions(new >>>>> > FlatMapFunction<Iterator<String>, Integer>() { >>>>> > >>>>> > public Iterable<Integer> call(Iterator<String> input) >>>>> > throws Exception { >>>>> > List<Integer> output = new ArrayList<Integer>(); >>>>> > while(input.hasNext()){ >>>>> > output.add(input.next().length()); >>>>> > } >>>>> > return output; >>>>> > } >>>>> > >>>>> > }); >>>>> > >>>>> > >>>>> > Here does input is present in memory and can contain complete >>>>> partition of >>>>> > gbs ? >>>>> > Will this function call(Iterator<String> input) is called only for >>>>> no of >>>>> > partitions(say if I have 10 in this example) times. Not no of lines >>>>> > times(say 10000000) . >>>>> > >>>>> > >>>>> > And whats the use of mapPartitionsWithIndex ? >>>>> > >>>>> > Thanks >>>>> > >>>>> >>>> >>>> >>> >>> >>> -- >>> Hao Ren >>> >>> Data Engineer @ leboncoin >>> >>> Paris, France >>> >> >> >