Re: Spark partitioning question

2015-05-05 Thread Marius Danciu
Turned out that is was sufficient do to repartitionAndSortWithinPartitions
... so far so good ;)

On Tue, May 5, 2015 at 9:45 AM Marius Danciu marius.dan...@gmail.com
wrote:

 Hi Imran,

 Yes that's what MyPartitioner does. I do see (using traces from
 MyPartitioner) that the key is partitioned on partition 0 but then I see
 this record arriving in both Yarn containers (I see it in the logs).
 Basically I need to emulate a Hadoop map-reduce job in Spark and groupByKey
 seemed a natural fit ( ... I am aware of its limitations).

 Thanks,
 Marius

 On Mon, May 4, 2015 at 10:45 PM Imran Rashid iras...@cloudera.com wrote:

 Hi Marius,

 I am also a little confused -- are you saying that myPartitions is
 basically something like:

 class MyPartitioner extends Partitioner {
   def numPartitions = 1
   def getPartition(key: Any) = 0
 }

 ??

 If so, I don't understand how you'd ever end up data in two partitions.
 Indeed, than everything before the call to partitionBy(myPartitioner) is
 somewhat irrelevant.  The important point is the partitionsBy should put
 all the data in one partition, and then the operations after that do not
 move data between partitions.  so if you're really observing data in two
 partitions, then it would good to know more about what version of spark you
 are on, your data etc. as it sounds like a bug.

 But, I have a feeling there is some misunderstanding about what your
 partitioner is doing.  Eg., I think doing groupByKey followed by sortByKey
 doesn't make a lot of sense -- in general one sortByKey is all you need
 (its not exactly the same, but most probably close enough, and avoids doing
 another expensive shuffle).  If you can share a bit more information on
 your partitioner, and what properties you need for your f, that might
 help.

 thanks,
 Imran


 On Tue, Apr 28, 2015 at 7:10 AM, Marius Danciu marius.dan...@gmail.com
 wrote:

 Hello all,

 I have the following Spark (pseudo)code:

 rdd = mapPartitionsWithIndex(...)
 .mapPartitionsToPair(...)
 .groupByKey()
 .sortByKey(comparator)
 .partitionBy(myPartitioner)
 .mapPartitionsWithIndex(...)
 .mapPartitionsToPair( *f* )

 The input data has 2 input splits (yarn 2.6.0).
 myPartitioner partitions all the records on partition 0, which is
 correct, so the intuition is that f provided to the last transformation
 (mapPartitionsToPair) would run sequentially inside a single yarn
 container. However from yarn logs I do see that both yarn containers are
 processing records from the same partition ... and *sometimes*  the
 over all job fails (due to the code in f which expects a certain order of
 records) and yarn container 1 receives the records as expected, whereas
 yarn container 2 receives a subset of records ... for a reason I cannot
 explain and f fails.

 The overall behavior of this job is that sometimes it succeeds and
 sometimes it fails ... apparently due to inconsistent propagation of sorted
 records to yarn containers.


 If any of this makes any sense to you, please let me know what I am
 missing.



 Best,
 Marius





Re: Spark partitioning question

2015-05-05 Thread Marius Danciu
Hi Imran,

Yes that's what MyPartitioner does. I do see (using traces from
MyPartitioner) that the key is partitioned on partition 0 but then I see
this record arriving in both Yarn containers (I see it in the logs).
Basically I need to emulate a Hadoop map-reduce job in Spark and groupByKey
seemed a natural fit ( ... I am aware of its limitations).

Thanks,
Marius

On Mon, May 4, 2015 at 10:45 PM Imran Rashid iras...@cloudera.com wrote:

 Hi Marius,

 I am also a little confused -- are you saying that myPartitions is
 basically something like:

 class MyPartitioner extends Partitioner {
   def numPartitions = 1
   def getPartition(key: Any) = 0
 }

 ??

 If so, I don't understand how you'd ever end up data in two partitions.
 Indeed, than everything before the call to partitionBy(myPartitioner) is
 somewhat irrelevant.  The important point is the partitionsBy should put
 all the data in one partition, and then the operations after that do not
 move data between partitions.  so if you're really observing data in two
 partitions, then it would good to know more about what version of spark you
 are on, your data etc. as it sounds like a bug.

 But, I have a feeling there is some misunderstanding about what your
 partitioner is doing.  Eg., I think doing groupByKey followed by sortByKey
 doesn't make a lot of sense -- in general one sortByKey is all you need
 (its not exactly the same, but most probably close enough, and avoids doing
 another expensive shuffle).  If you can share a bit more information on
 your partitioner, and what properties you need for your f, that might
 help.

 thanks,
 Imran


 On Tue, Apr 28, 2015 at 7:10 AM, Marius Danciu marius.dan...@gmail.com
 wrote:

 Hello all,

 I have the following Spark (pseudo)code:

 rdd = mapPartitionsWithIndex(...)
 .mapPartitionsToPair(...)
 .groupByKey()
 .sortByKey(comparator)
 .partitionBy(myPartitioner)
 .mapPartitionsWithIndex(...)
 .mapPartitionsToPair( *f* )

 The input data has 2 input splits (yarn 2.6.0).
 myPartitioner partitions all the records on partition 0, which is
 correct, so the intuition is that f provided to the last transformation
 (mapPartitionsToPair) would run sequentially inside a single yarn
 container. However from yarn logs I do see that both yarn containers are
 processing records from the same partition ... and *sometimes*  the over
 all job fails (due to the code in f which expects a certain order of
 records) and yarn container 1 receives the records as expected, whereas
 yarn container 2 receives a subset of records ... for a reason I cannot
 explain and f fails.

 The overall behavior of this job is that sometimes it succeeds and
 sometimes it fails ... apparently due to inconsistent propagation of sorted
 records to yarn containers.


 If any of this makes any sense to you, please let me know what I am
 missing.



 Best,
 Marius





Re: Spark partitioning question

2015-05-04 Thread Imran Rashid
Hi Marius,

I am also a little confused -- are you saying that myPartitions is
basically something like:

class MyPartitioner extends Partitioner {
  def numPartitions = 1
  def getPartition(key: Any) = 0
}

??

If so, I don't understand how you'd ever end up data in two partitions.
Indeed, than everything before the call to partitionBy(myPartitioner) is
somewhat irrelevant.  The important point is the partitionsBy should put
all the data in one partition, and then the operations after that do not
move data between partitions.  so if you're really observing data in two
partitions, then it would good to know more about what version of spark you
are on, your data etc. as it sounds like a bug.

But, I have a feeling there is some misunderstanding about what your
partitioner is doing.  Eg., I think doing groupByKey followed by sortByKey
doesn't make a lot of sense -- in general one sortByKey is all you need
(its not exactly the same, but most probably close enough, and avoids doing
another expensive shuffle).  If you can share a bit more information on
your partitioner, and what properties you need for your f, that might
help.

thanks,
Imran


On Tue, Apr 28, 2015 at 7:10 AM, Marius Danciu marius.dan...@gmail.com
wrote:

 Hello all,

 I have the following Spark (pseudo)code:

 rdd = mapPartitionsWithIndex(...)
 .mapPartitionsToPair(...)
 .groupByKey()
 .sortByKey(comparator)
 .partitionBy(myPartitioner)
 .mapPartitionsWithIndex(...)
 .mapPartitionsToPair( *f* )

 The input data has 2 input splits (yarn 2.6.0).
 myPartitioner partitions all the records on partition 0, which is correct,
 so the intuition is that f provided to the last transformation
 (mapPartitionsToPair) would run sequentially inside a single yarn
 container. However from yarn logs I do see that both yarn containers are
 processing records from the same partition ... and *sometimes*  the over
 all job fails (due to the code in f which expects a certain order of
 records) and yarn container 1 receives the records as expected, whereas
 yarn container 2 receives a subset of records ... for a reason I cannot
 explain and f fails.

 The overall behavior of this job is that sometimes it succeeds and
 sometimes it fails ... apparently due to inconsistent propagation of sorted
 records to yarn containers.


 If any of this makes any sense to you, please let me know what I am
 missing.



 Best,
 Marius



Re: Spark partitioning question

2015-04-28 Thread Marius Danciu
Thank you Silvio,

I am aware of groubBy limitations and this is subject for replacement.

I did try repartitionAndSortWithinPartitions but then I end up with maybe
too much shuffling one from groupByKey and the other from repartition.

My expectation was that since N records are partitioned to the same
partition ...say 0, doing a mapPartition on the resulting RDD would place
all records for partition 0 into a single on a single node. Seems to me
that this is not quite the case since N can span to multiple HDFS blocks
and subsequent mapPartition operation would be paralelized on multiple
nodes. In my case I see 2 yarn containers receiving records during a
mapPartition operation applied on the sorted partition. I need to test more
but it seems that applying the same partitioner again right before the
last mapPartition can
help.

Best,
Marius

On Tue, Apr 28, 2015 at 4:40 PM Silvio Fiorito 
silvio.fior...@granturing.com wrote:

   Hi Marius,

  What’s the expected output?

  I would recommend avoiding the groupByKey if possible since it’s going
 to force all records for each key to go to an executor which may overload
 it.

  Also if you need to sort and repartition, try using
 repartitionAndSortWithinPartitions to do it in one shot.

  Thanks,
 Silvio

   From: Marius Danciu
 Date: Tuesday, April 28, 2015 at 8:10 AM
 To: user
 Subject: Spark partitioning question

   Hello all,

  I have the following Spark (pseudo)code:

  rdd = mapPartitionsWithIndex(...)
 .mapPartitionsToPair(...)
 .groupByKey()
 .sortByKey(comparator)
 .partitionBy(myPartitioner)
  .mapPartitionsWithIndex(...)
 .mapPartitionsToPair( *f* )

  The input data has 2 input splits (yarn 2.6.0).
 myPartitioner partitions all the records on partition 0, which is correct,
 so the intuition is that f provided to the last transformation
 (mapPartitionsToPair) would run sequentially inside a single yarn
 container. However from yarn logs I do see that both yarn containers are
 processing records from the same partition ... and *sometimes*  the over
 all job fails (due to the code in f which expects a certain order of
 records) and yarn container 1 receives the records as expected, whereas
 yarn container 2 receives a subset of records ... for a reason I cannot
 explain and f fails.

  The overall behavior of this job is that sometimes it succeeds and
 sometimes it fails ... apparently due to inconsistent propagation of sorted
 records to yarn containers.


  If any of this makes any sense to you, please let me know what I am
 missing.



  Best,
 Marius



Re: Spark partitioning question

2015-04-28 Thread Silvio Fiorito
So the other issue could due to the fact that using mapPartitions after the 
partitionBy, you essentially lose the partitioning of the keys since Spark 
assumes the keys were altered in the map phase. So really the partitionBy gets 
lost after the mapPartitions, that’s why you need to do it again.


From: Marius Danciu
Date: Tuesday, April 28, 2015 at 9:53 AM
To: Silvio Fiorito, user
Subject: Re: Spark partitioning question

Thank you Silvio,

I am aware of groubBy limitations and this is subject for replacement.

I did try repartitionAndSortWithinPartitions but then I end up with maybe too 
much shuffling one from groupByKey and the other from repartition.

My expectation was that since N records are partitioned to the same partition 
...say 0, doing a mapPartition on the resulting RDD would place all records for 
partition 0 into a single on a single node. Seems to me that this is not quite 
the case since N can span to multiple HDFS blocks and subsequent mapPartition 
operation would be paralelized on multiple nodes. In my case I see 2 yarn 
containers receiving records during a mapPartition operation applied on the 
sorted partition. I need to test more but it seems that applying the same 
partitioner again right before the last mapPartition can help.

Best,
Marius

On Tue, Apr 28, 2015 at 4:40 PM Silvio Fiorito 
silvio.fior...@granturing.commailto:silvio.fior...@granturing.com wrote:
Hi Marius,

What’s the expected output?

I would recommend avoiding the groupByKey if possible since it’s going to force 
all records for each key to go to an executor which may overload it.

Also if you need to sort and repartition, try using 
repartitionAndSortWithinPartitions to do it in one shot.

Thanks,
Silvio

From: Marius Danciu
Date: Tuesday, April 28, 2015 at 8:10 AM
To: user
Subject: Spark partitioning question

Hello all,

I have the following Spark (pseudo)code:

rdd = mapPartitionsWithIndex(...)
.mapPartitionsToPair(...)
.groupByKey()
.sortByKey(comparator)
.partitionBy(myPartitioner)
.mapPartitionsWithIndex(...)
.mapPartitionsToPair( f )

The input data has 2 input splits (yarn 2.6.0).
myPartitioner partitions all the records on partition 0, which is correct, so 
the intuition is that f provided to the last transformation 
(mapPartitionsToPair) would run sequentially inside a single yarn container. 
However from yarn logs I do see that both yarn containers are processing 
records from the same partition ... and sometimes  the over all job fails (due 
to the code in f which expects a certain order of records) and yarn container 1 
receives the records as expected, whereas yarn container 2 receives a subset of 
records ... for a reason I cannot explain and f fails.

The overall behavior of this job is that sometimes it succeeds and sometimes it 
fails ... apparently due to inconsistent propagation of sorted records to yarn 
containers.


If any of this makes any sense to you, please let me know what I am missing.



Best,
Marius