Turned out that is was sufficient do to repartitionAndSortWithinPartitions ... so far so good ;)
On Tue, May 5, 2015 at 9:45 AM Marius Danciu <marius.dan...@gmail.com> wrote: > Hi Imran, > > Yes that's what MyPartitioner does. I do see (using traces from > MyPartitioner) that the key is partitioned on partition 0 but then I see > this record arriving in both Yarn containers (I see it in the logs). > Basically I need to emulate a Hadoop map-reduce job in Spark and groupByKey > seemed a natural fit ( ... I am aware of its limitations). > > Thanks, > Marius > > On Mon, May 4, 2015 at 10:45 PM Imran Rashid <iras...@cloudera.com> wrote: > >> Hi Marius, >> >> I am also a little confused -- are you saying that myPartitions is >> basically something like: >> >> class MyPartitioner extends Partitioner { >> def numPartitions = 1 >> def getPartition(key: Any) = 0 >> } >> >> ?? >> >> If so, I don't understand how you'd ever end up data in two partitions. >> Indeed, than everything before the call to partitionBy(myPartitioner) is >> somewhat irrelevant. The important point is the partitionsBy should put >> all the data in one partition, and then the operations after that do not >> move data between partitions. so if you're really observing data in two >> partitions, then it would good to know more about what version of spark you >> are on, your data etc. as it sounds like a bug. >> >> But, I have a feeling there is some misunderstanding about what your >> partitioner is doing. Eg., I think doing groupByKey followed by sortByKey >> doesn't make a lot of sense -- in general one sortByKey is all you need >> (its not exactly the same, but most probably close enough, and avoids doing >> another expensive shuffle). If you can share a bit more information on >> your partitioner, and what properties you need for your "f", that might >> help. >> >> thanks, >> Imran >> >> >> On Tue, Apr 28, 2015 at 7:10 AM, Marius Danciu <marius.dan...@gmail.com> >> wrote: >> >>> Hello all, >>> >>> I have the following Spark (pseudo)code: >>> >>> rdd = mapPartitionsWithIndex(...) >>> .mapPartitionsToPair(...) >>> .groupByKey() >>> .sortByKey(comparator) >>> .partitionBy(myPartitioner) >>> .mapPartitionsWithIndex(...) >>> .mapPartitionsToPair( *f* ) >>> >>> The input data has 2 input splits (yarn 2.6.0). >>> myPartitioner partitions all the records on partition 0, which is >>> correct, so the intuition is that f provided to the last transformation >>> (mapPartitionsToPair) would run sequentially inside a single yarn >>> container. However from yarn logs I do see that both yarn containers are >>> processing records from the same partition ... and *sometimes* the >>> over all job fails (due to the code in f which expects a certain order of >>> records) and yarn container 1 receives the records as expected, whereas >>> yarn container 2 receives a subset of records ... for a reason I cannot >>> explain and f fails. >>> >>> The overall behavior of this job is that sometimes it succeeds and >>> sometimes it fails ... apparently due to inconsistent propagation of sorted >>> records to yarn containers. >>> >>> >>> If any of this makes any sense to you, please let me know what I am >>> missing. >>> >>> >>> >>> Best, >>> Marius >>> >> >>