Hi Imran,

Yes that's what MyPartitioner does. I do see (using traces from
MyPartitioner) that the key is partitioned on partition 0 but then I see
this record arriving in both Yarn containers (I see it in the logs).
Basically I need to emulate a Hadoop map-reduce job in Spark and groupByKey
seemed a natural fit ( ... I am aware of its limitations).

Thanks,
Marius

On Mon, May 4, 2015 at 10:45 PM Imran Rashid <iras...@cloudera.com> wrote:

> Hi Marius,
>
> I am also a little confused -- are you saying that myPartitions is
> basically something like:
>
> class MyPartitioner extends Partitioner {
>   def numPartitions = 1
>   def getPartition(key: Any) = 0
> }
>
> ??
>
> If so, I don't understand how you'd ever end up data in two partitions.
> Indeed, than everything before the call to partitionBy(myPartitioner) is
> somewhat irrelevant.  The important point is the partitionsBy should put
> all the data in one partition, and then the operations after that do not
> move data between partitions.  so if you're really observing data in two
> partitions, then it would good to know more about what version of spark you
> are on, your data etc. as it sounds like a bug.
>
> But, I have a feeling there is some misunderstanding about what your
> partitioner is doing.  Eg., I think doing groupByKey followed by sortByKey
> doesn't make a lot of sense -- in general one sortByKey is all you need
> (its not exactly the same, but most probably close enough, and avoids doing
> another expensive shuffle).  If you can share a bit more information on
> your partitioner, and what properties you need for your "f", that might
> help.
>
> thanks,
> Imran
>
>
> On Tue, Apr 28, 2015 at 7:10 AM, Marius Danciu <marius.dan...@gmail.com>
> wrote:
>
>> Hello all,
>>
>> I have the following Spark (pseudo)code:
>>
>> rdd = mapPartitionsWithIndex(...)
>>         .mapPartitionsToPair(...)
>>         .groupByKey()
>>         .sortByKey(comparator)
>>         .partitionBy(myPartitioner)
>>         .mapPartitionsWithIndex(...)
>>         .mapPartitionsToPair( *f* )
>>
>> The input data has 2 input splits (yarn 2.6.0).
>> myPartitioner partitions all the records on partition 0, which is
>> correct, so the intuition is that f provided to the last transformation
>> (mapPartitionsToPair) would run sequentially inside a single yarn
>> container. However from yarn logs I do see that both yarn containers are
>> processing records from the same partition ... and *sometimes*  the over
>> all job fails (due to the code in f which expects a certain order of
>> records) and yarn container 1 receives the records as expected, whereas
>> yarn container 2 receives a subset of records ... for a reason I cannot
>> explain and f fails.
>>
>> The overall behavior of this job is that sometimes it succeeds and
>> sometimes it fails ... apparently due to inconsistent propagation of sorted
>> records to yarn containers.
>>
>>
>> If any of this makes any sense to you, please let me know what I am
>> missing.
>>
>>
>>
>> Best,
>> Marius
>>
>
>

Reply via email to