Thank you Silvio, I am aware of groubBy limitations and this is subject for replacement.
I did try repartitionAndSortWithinPartitions but then I end up with maybe too much shuffling one from groupByKey and the other from repartition. My expectation was that since N records are partitioned to the same partition ...say 0, doing a mapPartition on the resulting RDD would place all records for partition 0 into a single on a single node. Seems to me that this is not quite the case since N can span to multiple HDFS blocks and subsequent mapPartition operation would be paralelized on multiple nodes. In my case I see 2 yarn containers receiving records during a mapPartition operation applied on the sorted partition. I need to test more but it seems that applying the same partitioner again right before the last mapPartition can help. Best, Marius On Tue, Apr 28, 2015 at 4:40 PM Silvio Fiorito < silvio.fior...@granturing.com> wrote: > Hi Marius, > > What’s the expected output? > > I would recommend avoiding the groupByKey if possible since it’s going > to force all records for each key to go to an executor which may overload > it. > > Also if you need to sort and repartition, try using > repartitionAndSortWithinPartitions to do it in one shot. > > Thanks, > Silvio > > From: Marius Danciu > Date: Tuesday, April 28, 2015 at 8:10 AM > To: user > Subject: Spark partitioning question > > Hello all, > > I have the following Spark (pseudo)code: > > rdd = mapPartitionsWithIndex(...) > .mapPartitionsToPair(...) > .groupByKey() > .sortByKey(comparator) > .partitionBy(myPartitioner) > .mapPartitionsWithIndex(...) > .mapPartitionsToPair( *f* ) > > The input data has 2 input splits (yarn 2.6.0). > myPartitioner partitions all the records on partition 0, which is correct, > so the intuition is that f provided to the last transformation > (mapPartitionsToPair) would run sequentially inside a single yarn > container. However from yarn logs I do see that both yarn containers are > processing records from the same partition ... and *sometimes* the over > all job fails (due to the code in f which expects a certain order of > records) and yarn container 1 receives the records as expected, whereas > yarn container 2 receives a subset of records ... for a reason I cannot > explain and f fails. > > The overall behavior of this job is that sometimes it succeeds and > sometimes it fails ... apparently due to inconsistent propagation of sorted > records to yarn containers. > > > If any of this makes any sense to you, please let me know what I am > missing. > > > > Best, > Marius >