Turned out that is was sufficient do to repartitionAndSortWithinPartitions
... so far so good ;)

On Tue, May 5, 2015 at 9:45 AM Marius Danciu <marius.dan...@gmail.com>
wrote:

> Hi Imran,
>
> Yes that's what MyPartitioner does. I do see (using traces from
> MyPartitioner) that the key is partitioned on partition 0 but then I see
> this record arriving in both Yarn containers (I see it in the logs).
> Basically I need to emulate a Hadoop map-reduce job in Spark and groupByKey
> seemed a natural fit ( ... I am aware of its limitations).
>
> Thanks,
> Marius
>
> On Mon, May 4, 2015 at 10:45 PM Imran Rashid <iras...@cloudera.com> wrote:
>
>> Hi Marius,
>>
>> I am also a little confused -- are you saying that myPartitions is
>> basically something like:
>>
>> class MyPartitioner extends Partitioner {
>>   def numPartitions = 1
>>   def getPartition(key: Any) = 0
>> }
>>
>> ??
>>
>> If so, I don't understand how you'd ever end up data in two partitions.
>> Indeed, than everything before the call to partitionBy(myPartitioner) is
>> somewhat irrelevant.  The important point is the partitionsBy should put
>> all the data in one partition, and then the operations after that do not
>> move data between partitions.  so if you're really observing data in two
>> partitions, then it would good to know more about what version of spark you
>> are on, your data etc. as it sounds like a bug.
>>
>> But, I have a feeling there is some misunderstanding about what your
>> partitioner is doing.  Eg., I think doing groupByKey followed by sortByKey
>> doesn't make a lot of sense -- in general one sortByKey is all you need
>> (its not exactly the same, but most probably close enough, and avoids doing
>> another expensive shuffle).  If you can share a bit more information on
>> your partitioner, and what properties you need for your "f", that might
>> help.
>>
>> thanks,
>> Imran
>>
>>
>> On Tue, Apr 28, 2015 at 7:10 AM, Marius Danciu <marius.dan...@gmail.com>
>> wrote:
>>
>>> Hello all,
>>>
>>> I have the following Spark (pseudo)code:
>>>
>>> rdd = mapPartitionsWithIndex(...)
>>>         .mapPartitionsToPair(...)
>>>         .groupByKey()
>>>         .sortByKey(comparator)
>>>         .partitionBy(myPartitioner)
>>>         .mapPartitionsWithIndex(...)
>>>         .mapPartitionsToPair( *f* )
>>>
>>> The input data has 2 input splits (yarn 2.6.0).
>>> myPartitioner partitions all the records on partition 0, which is
>>> correct, so the intuition is that f provided to the last transformation
>>> (mapPartitionsToPair) would run sequentially inside a single yarn
>>> container. However from yarn logs I do see that both yarn containers are
>>> processing records from the same partition ... and *sometimes*  the
>>> over all job fails (due to the code in f which expects a certain order of
>>> records) and yarn container 1 receives the records as expected, whereas
>>> yarn container 2 receives a subset of records ... for a reason I cannot
>>> explain and f fails.
>>>
>>> The overall behavior of this job is that sometimes it succeeds and
>>> sometimes it fails ... apparently due to inconsistent propagation of sorted
>>> records to yarn containers.
>>>
>>>
>>> If any of this makes any sense to you, please let me know what I am
>>> missing.
>>>
>>>
>>>
>>> Best,
>>> Marius
>>>
>>
>>

Reply via email to