Hi Marius,

I am also a little confused -- are you saying that myPartitions is
basically something like:

class MyPartitioner extends Partitioner {
  def numPartitions = 1
  def getPartition(key: Any) = 0
}

??

If so, I don't understand how you'd ever end up data in two partitions.
Indeed, than everything before the call to partitionBy(myPartitioner) is
somewhat irrelevant.  The important point is the partitionsBy should put
all the data in one partition, and then the operations after that do not
move data between partitions.  so if you're really observing data in two
partitions, then it would good to know more about what version of spark you
are on, your data etc. as it sounds like a bug.

But, I have a feeling there is some misunderstanding about what your
partitioner is doing.  Eg., I think doing groupByKey followed by sortByKey
doesn't make a lot of sense -- in general one sortByKey is all you need
(its not exactly the same, but most probably close enough, and avoids doing
another expensive shuffle).  If you can share a bit more information on
your partitioner, and what properties you need for your "f", that might
help.

thanks,
Imran


On Tue, Apr 28, 2015 at 7:10 AM, Marius Danciu <marius.dan...@gmail.com>
wrote:

> Hello all,
>
> I have the following Spark (pseudo)code:
>
> rdd = mapPartitionsWithIndex(...)
>         .mapPartitionsToPair(...)
>         .groupByKey()
>         .sortByKey(comparator)
>         .partitionBy(myPartitioner)
>         .mapPartitionsWithIndex(...)
>         .mapPartitionsToPair( *f* )
>
> The input data has 2 input splits (yarn 2.6.0).
> myPartitioner partitions all the records on partition 0, which is correct,
> so the intuition is that f provided to the last transformation
> (mapPartitionsToPair) would run sequentially inside a single yarn
> container. However from yarn logs I do see that both yarn containers are
> processing records from the same partition ... and *sometimes*  the over
> all job fails (due to the code in f which expects a certain order of
> records) and yarn container 1 receives the records as expected, whereas
> yarn container 2 receives a subset of records ... for a reason I cannot
> explain and f fails.
>
> The overall behavior of this job is that sometimes it succeeds and
> sometimes it fails ... apparently due to inconsistent propagation of sorted
> records to yarn containers.
>
>
> If any of this makes any sense to you, please let me know what I am
> missing.
>
>
>
> Best,
> Marius
>

Reply via email to