Hi Imran, Yes that's what MyPartitioner does. I do see (using traces from MyPartitioner) that the key is partitioned on partition 0 but then I see this record arriving in both Yarn containers (I see it in the logs). Basically I need to emulate a Hadoop map-reduce job in Spark and groupByKey seemed a natural fit ( ... I am aware of its limitations).
Thanks, Marius On Mon, May 4, 2015 at 10:45 PM Imran Rashid <iras...@cloudera.com> wrote: > Hi Marius, > > I am also a little confused -- are you saying that myPartitions is > basically something like: > > class MyPartitioner extends Partitioner { > def numPartitions = 1 > def getPartition(key: Any) = 0 > } > > ?? > > If so, I don't understand how you'd ever end up data in two partitions. > Indeed, than everything before the call to partitionBy(myPartitioner) is > somewhat irrelevant. The important point is the partitionsBy should put > all the data in one partition, and then the operations after that do not > move data between partitions. so if you're really observing data in two > partitions, then it would good to know more about what version of spark you > are on, your data etc. as it sounds like a bug. > > But, I have a feeling there is some misunderstanding about what your > partitioner is doing. Eg., I think doing groupByKey followed by sortByKey > doesn't make a lot of sense -- in general one sortByKey is all you need > (its not exactly the same, but most probably close enough, and avoids doing > another expensive shuffle). If you can share a bit more information on > your partitioner, and what properties you need for your "f", that might > help. > > thanks, > Imran > > > On Tue, Apr 28, 2015 at 7:10 AM, Marius Danciu <marius.dan...@gmail.com> > wrote: > >> Hello all, >> >> I have the following Spark (pseudo)code: >> >> rdd = mapPartitionsWithIndex(...) >> .mapPartitionsToPair(...) >> .groupByKey() >> .sortByKey(comparator) >> .partitionBy(myPartitioner) >> .mapPartitionsWithIndex(...) >> .mapPartitionsToPair( *f* ) >> >> The input data has 2 input splits (yarn 2.6.0). >> myPartitioner partitions all the records on partition 0, which is >> correct, so the intuition is that f provided to the last transformation >> (mapPartitionsToPair) would run sequentially inside a single yarn >> container. However from yarn logs I do see that both yarn containers are >> processing records from the same partition ... and *sometimes* the over >> all job fails (due to the code in f which expects a certain order of >> records) and yarn container 1 receives the records as expected, whereas >> yarn container 2 receives a subset of records ... for a reason I cannot >> explain and f fails. >> >> The overall behavior of this job is that sometimes it succeeds and >> sometimes it fails ... apparently due to inconsistent propagation of sorted >> records to yarn containers. >> >> >> If any of this makes any sense to you, please let me know what I am >> missing. >> >> >> >> Best, >> Marius >> > >