Thank you Silvio,

I am aware of groubBy limitations and this is subject for replacement.

I did try repartitionAndSortWithinPartitions but then I end up with maybe
too much shuffling one from groupByKey and the other from repartition.

My expectation was that since N records are partitioned to the same
partition ...say 0, doing a mapPartition on the resulting RDD would place
all records for partition 0 into a single on a single node. Seems to me
that this is not quite the case since N can span to multiple HDFS blocks
and subsequent mapPartition operation would be paralelized on multiple
nodes. In my case I see 2 yarn containers receiving records during a
mapPartition operation applied on the sorted partition. I need to test more
but it seems that applying the same partitioner again right before the
last mapPartition can
help.

Best,
Marius

On Tue, Apr 28, 2015 at 4:40 PM Silvio Fiorito <
silvio.fior...@granturing.com> wrote:

>   Hi Marius,
>
>  What’s the expected output?
>
>  I would recommend avoiding the groupByKey if possible since it’s going
> to force all records for each key to go to an executor which may overload
> it.
>
>  Also if you need to sort and repartition, try using
> repartitionAndSortWithinPartitions to do it in one shot.
>
>  Thanks,
> Silvio
>
>   From: Marius Danciu
> Date: Tuesday, April 28, 2015 at 8:10 AM
> To: user
> Subject: Spark partitioning question
>
>   Hello all,
>
>  I have the following Spark (pseudo)code:
>
>  rdd = mapPartitionsWithIndex(...)
>         .mapPartitionsToPair(...)
>         .groupByKey()
>         .sortByKey(comparator)
>         .partitionBy(myPartitioner)
>          .mapPartitionsWithIndex(...)
>         .mapPartitionsToPair( *f* )
>
>  The input data has 2 input splits (yarn 2.6.0).
> myPartitioner partitions all the records on partition 0, which is correct,
> so the intuition is that f provided to the last transformation
> (mapPartitionsToPair) would run sequentially inside a single yarn
> container. However from yarn logs I do see that both yarn containers are
> processing records from the same partition ... and *sometimes*  the over
> all job fails (due to the code in f which expects a certain order of
> records) and yarn container 1 receives the records as expected, whereas
> yarn container 2 receives a subset of records ... for a reason I cannot
> explain and f fails.
>
>  The overall behavior of this job is that sometimes it succeeds and
> sometimes it fails ... apparently due to inconsistent propagation of sorted
> records to yarn containers.
>
>
>  If any of this makes any sense to you, please let me know what I am
> missing.
>
>
>
>  Best,
> Marius
>

Reply via email to