Re: Spark partitioning question
Turned out that is was sufficient do to repartitionAndSortWithinPartitions ... so far so good ;) On Tue, May 5, 2015 at 9:45 AM Marius Danciu wrote: > Hi Imran, > > Yes that's what MyPartitioner does. I do see (using traces from > MyPartitioner) that the key is partitioned on partition 0 but then I see > this record arriving in both Yarn containers (I see it in the logs). > Basically I need to emulate a Hadoop map-reduce job in Spark and groupByKey > seemed a natural fit ( ... I am aware of its limitations). > > Thanks, > Marius > > On Mon, May 4, 2015 at 10:45 PM Imran Rashid wrote: > >> Hi Marius, >> >> I am also a little confused -- are you saying that myPartitions is >> basically something like: >> >> class MyPartitioner extends Partitioner { >> def numPartitions = 1 >> def getPartition(key: Any) = 0 >> } >> >> ?? >> >> If so, I don't understand how you'd ever end up data in two partitions. >> Indeed, than everything before the call to partitionBy(myPartitioner) is >> somewhat irrelevant. The important point is the partitionsBy should put >> all the data in one partition, and then the operations after that do not >> move data between partitions. so if you're really observing data in two >> partitions, then it would good to know more about what version of spark you >> are on, your data etc. as it sounds like a bug. >> >> But, I have a feeling there is some misunderstanding about what your >> partitioner is doing. Eg., I think doing groupByKey followed by sortByKey >> doesn't make a lot of sense -- in general one sortByKey is all you need >> (its not exactly the same, but most probably close enough, and avoids doing >> another expensive shuffle). If you can share a bit more information on >> your partitioner, and what properties you need for your "f", that might >> help. >> >> thanks, >> Imran >> >> >> On Tue, Apr 28, 2015 at 7:10 AM, Marius Danciu >> wrote: >> >>> Hello all, >>> >>> I have the following Spark (pseudo)code: >>> >>> rdd = mapPartitionsWithIndex(...) >>> .mapPartitionsToPair(...) >>> .groupByKey() >>> .sortByKey(comparator) >>> .partitionBy(myPartitioner) >>> .mapPartitionsWithIndex(...) >>> .mapPartitionsToPair( *f* ) >>> >>> The input data has 2 input splits (yarn 2.6.0). >>> myPartitioner partitions all the records on partition 0, which is >>> correct, so the intuition is that f provided to the last transformation >>> (mapPartitionsToPair) would run sequentially inside a single yarn >>> container. However from yarn logs I do see that both yarn containers are >>> processing records from the same partition ... and *sometimes* the >>> over all job fails (due to the code in f which expects a certain order of >>> records) and yarn container 1 receives the records as expected, whereas >>> yarn container 2 receives a subset of records ... for a reason I cannot >>> explain and f fails. >>> >>> The overall behavior of this job is that sometimes it succeeds and >>> sometimes it fails ... apparently due to inconsistent propagation of sorted >>> records to yarn containers. >>> >>> >>> If any of this makes any sense to you, please let me know what I am >>> missing. >>> >>> >>> >>> Best, >>> Marius >>> >> >>
Re: Spark partitioning question
Hi Imran, Yes that's what MyPartitioner does. I do see (using traces from MyPartitioner) that the key is partitioned on partition 0 but then I see this record arriving in both Yarn containers (I see it in the logs). Basically I need to emulate a Hadoop map-reduce job in Spark and groupByKey seemed a natural fit ( ... I am aware of its limitations). Thanks, Marius On Mon, May 4, 2015 at 10:45 PM Imran Rashid wrote: > Hi Marius, > > I am also a little confused -- are you saying that myPartitions is > basically something like: > > class MyPartitioner extends Partitioner { > def numPartitions = 1 > def getPartition(key: Any) = 0 > } > > ?? > > If so, I don't understand how you'd ever end up data in two partitions. > Indeed, than everything before the call to partitionBy(myPartitioner) is > somewhat irrelevant. The important point is the partitionsBy should put > all the data in one partition, and then the operations after that do not > move data between partitions. so if you're really observing data in two > partitions, then it would good to know more about what version of spark you > are on, your data etc. as it sounds like a bug. > > But, I have a feeling there is some misunderstanding about what your > partitioner is doing. Eg., I think doing groupByKey followed by sortByKey > doesn't make a lot of sense -- in general one sortByKey is all you need > (its not exactly the same, but most probably close enough, and avoids doing > another expensive shuffle). If you can share a bit more information on > your partitioner, and what properties you need for your "f", that might > help. > > thanks, > Imran > > > On Tue, Apr 28, 2015 at 7:10 AM, Marius Danciu > wrote: > >> Hello all, >> >> I have the following Spark (pseudo)code: >> >> rdd = mapPartitionsWithIndex(...) >> .mapPartitionsToPair(...) >> .groupByKey() >> .sortByKey(comparator) >> .partitionBy(myPartitioner) >> .mapPartitionsWithIndex(...) >> .mapPartitionsToPair( *f* ) >> >> The input data has 2 input splits (yarn 2.6.0). >> myPartitioner partitions all the records on partition 0, which is >> correct, so the intuition is that f provided to the last transformation >> (mapPartitionsToPair) would run sequentially inside a single yarn >> container. However from yarn logs I do see that both yarn containers are >> processing records from the same partition ... and *sometimes* the over >> all job fails (due to the code in f which expects a certain order of >> records) and yarn container 1 receives the records as expected, whereas >> yarn container 2 receives a subset of records ... for a reason I cannot >> explain and f fails. >> >> The overall behavior of this job is that sometimes it succeeds and >> sometimes it fails ... apparently due to inconsistent propagation of sorted >> records to yarn containers. >> >> >> If any of this makes any sense to you, please let me know what I am >> missing. >> >> >> >> Best, >> Marius >> > >
Re: Spark partitioning question
Hi Marius, I am also a little confused -- are you saying that myPartitions is basically something like: class MyPartitioner extends Partitioner { def numPartitions = 1 def getPartition(key: Any) = 0 } ?? If so, I don't understand how you'd ever end up data in two partitions. Indeed, than everything before the call to partitionBy(myPartitioner) is somewhat irrelevant. The important point is the partitionsBy should put all the data in one partition, and then the operations after that do not move data between partitions. so if you're really observing data in two partitions, then it would good to know more about what version of spark you are on, your data etc. as it sounds like a bug. But, I have a feeling there is some misunderstanding about what your partitioner is doing. Eg., I think doing groupByKey followed by sortByKey doesn't make a lot of sense -- in general one sortByKey is all you need (its not exactly the same, but most probably close enough, and avoids doing another expensive shuffle). If you can share a bit more information on your partitioner, and what properties you need for your "f", that might help. thanks, Imran On Tue, Apr 28, 2015 at 7:10 AM, Marius Danciu wrote: > Hello all, > > I have the following Spark (pseudo)code: > > rdd = mapPartitionsWithIndex(...) > .mapPartitionsToPair(...) > .groupByKey() > .sortByKey(comparator) > .partitionBy(myPartitioner) > .mapPartitionsWithIndex(...) > .mapPartitionsToPair( *f* ) > > The input data has 2 input splits (yarn 2.6.0). > myPartitioner partitions all the records on partition 0, which is correct, > so the intuition is that f provided to the last transformation > (mapPartitionsToPair) would run sequentially inside a single yarn > container. However from yarn logs I do see that both yarn containers are > processing records from the same partition ... and *sometimes* the over > all job fails (due to the code in f which expects a certain order of > records) and yarn container 1 receives the records as expected, whereas > yarn container 2 receives a subset of records ... for a reason I cannot > explain and f fails. > > The overall behavior of this job is that sometimes it succeeds and > sometimes it fails ... apparently due to inconsistent propagation of sorted > records to yarn containers. > > > If any of this makes any sense to you, please let me know what I am > missing. > > > > Best, > Marius >
Re: Spark partitioning question
So the other issue could due to the fact that using mapPartitions after the partitionBy, you essentially lose the partitioning of the keys since Spark assumes the keys were altered in the map phase. So really the partitionBy gets lost after the mapPartitions, that’s why you need to do it again. From: Marius Danciu Date: Tuesday, April 28, 2015 at 9:53 AM To: Silvio Fiorito, user Subject: Re: Spark partitioning question Thank you Silvio, I am aware of groubBy limitations and this is subject for replacement. I did try repartitionAndSortWithinPartitions but then I end up with maybe too much shuffling one from groupByKey and the other from repartition. My expectation was that since N records are partitioned to the same partition ...say 0, doing a mapPartition on the resulting RDD would place all records for partition 0 into a single on a single node. Seems to me that this is not quite the case since N can span to multiple HDFS blocks and subsequent mapPartition operation would be paralelized on multiple nodes. In my case I see 2 yarn containers receiving records during a mapPartition operation applied on the sorted partition. I need to test more but it seems that applying the same partitioner again right before the last mapPartition can help. Best, Marius On Tue, Apr 28, 2015 at 4:40 PM Silvio Fiorito mailto:silvio.fior...@granturing.com>> wrote: Hi Marius, What’s the expected output? I would recommend avoiding the groupByKey if possible since it’s going to force all records for each key to go to an executor which may overload it. Also if you need to sort and repartition, try using repartitionAndSortWithinPartitions to do it in one shot. Thanks, Silvio From: Marius Danciu Date: Tuesday, April 28, 2015 at 8:10 AM To: user Subject: Spark partitioning question Hello all, I have the following Spark (pseudo)code: rdd = mapPartitionsWithIndex(...) .mapPartitionsToPair(...) .groupByKey() .sortByKey(comparator) .partitionBy(myPartitioner) .mapPartitionsWithIndex(...) .mapPartitionsToPair( f ) The input data has 2 input splits (yarn 2.6.0). myPartitioner partitions all the records on partition 0, which is correct, so the intuition is that f provided to the last transformation (mapPartitionsToPair) would run sequentially inside a single yarn container. However from yarn logs I do see that both yarn containers are processing records from the same partition ... and sometimes the over all job fails (due to the code in f which expects a certain order of records) and yarn container 1 receives the records as expected, whereas yarn container 2 receives a subset of records ... for a reason I cannot explain and f fails. The overall behavior of this job is that sometimes it succeeds and sometimes it fails ... apparently due to inconsistent propagation of sorted records to yarn containers. If any of this makes any sense to you, please let me know what I am missing. Best, Marius
Re: Spark partitioning question
Thank you Silvio, I am aware of groubBy limitations and this is subject for replacement. I did try repartitionAndSortWithinPartitions but then I end up with maybe too much shuffling one from groupByKey and the other from repartition. My expectation was that since N records are partitioned to the same partition ...say 0, doing a mapPartition on the resulting RDD would place all records for partition 0 into a single on a single node. Seems to me that this is not quite the case since N can span to multiple HDFS blocks and subsequent mapPartition operation would be paralelized on multiple nodes. In my case I see 2 yarn containers receiving records during a mapPartition operation applied on the sorted partition. I need to test more but it seems that applying the same partitioner again right before the last mapPartition can help. Best, Marius On Tue, Apr 28, 2015 at 4:40 PM Silvio Fiorito < silvio.fior...@granturing.com> wrote: > Hi Marius, > > What’s the expected output? > > I would recommend avoiding the groupByKey if possible since it’s going > to force all records for each key to go to an executor which may overload > it. > > Also if you need to sort and repartition, try using > repartitionAndSortWithinPartitions to do it in one shot. > > Thanks, > Silvio > > From: Marius Danciu > Date: Tuesday, April 28, 2015 at 8:10 AM > To: user > Subject: Spark partitioning question > > Hello all, > > I have the following Spark (pseudo)code: > > rdd = mapPartitionsWithIndex(...) > .mapPartitionsToPair(...) > .groupByKey() > .sortByKey(comparator) > .partitionBy(myPartitioner) > .mapPartitionsWithIndex(...) > .mapPartitionsToPair( *f* ) > > The input data has 2 input splits (yarn 2.6.0). > myPartitioner partitions all the records on partition 0, which is correct, > so the intuition is that f provided to the last transformation > (mapPartitionsToPair) would run sequentially inside a single yarn > container. However from yarn logs I do see that both yarn containers are > processing records from the same partition ... and *sometimes* the over > all job fails (due to the code in f which expects a certain order of > records) and yarn container 1 receives the records as expected, whereas > yarn container 2 receives a subset of records ... for a reason I cannot > explain and f fails. > > The overall behavior of this job is that sometimes it succeeds and > sometimes it fails ... apparently due to inconsistent propagation of sorted > records to yarn containers. > > > If any of this makes any sense to you, please let me know what I am > missing. > > > > Best, > Marius >
Re: Spark partitioning question
Hi Marius, What’s the expected output? I would recommend avoiding the groupByKey if possible since it’s going to force all records for each key to go to an executor which may overload it. Also if you need to sort and repartition, try using repartitionAndSortWithinPartitions to do it in one shot. Thanks, Silvio From: Marius Danciu Date: Tuesday, April 28, 2015 at 8:10 AM To: user Subject: Spark partitioning question Hello all, I have the following Spark (pseudo)code: rdd = mapPartitionsWithIndex(...) .mapPartitionsToPair(...) .groupByKey() .sortByKey(comparator) .partitionBy(myPartitioner) .mapPartitionsWithIndex(...) .mapPartitionsToPair( f ) The input data has 2 input splits (yarn 2.6.0). myPartitioner partitions all the records on partition 0, which is correct, so the intuition is that f provided to the last transformation (mapPartitionsToPair) would run sequentially inside a single yarn container. However from yarn logs I do see that both yarn containers are processing records from the same partition ... and sometimes the over all job fails (due to the code in f which expects a certain order of records) and yarn container 1 receives the records as expected, whereas yarn container 2 receives a subset of records ... for a reason I cannot explain and f fails. The overall behavior of this job is that sometimes it succeeds and sometimes it fails ... apparently due to inconsistent propagation of sorted records to yarn containers. If any of this makes any sense to you, please let me know what I am missing. Best, Marius
Spark partitioning question
Hello all, I have the following Spark (pseudo)code: rdd = mapPartitionsWithIndex(...) .mapPartitionsToPair(...) .groupByKey() .sortByKey(comparator) .partitionBy(myPartitioner) .mapPartitionsWithIndex(...) .mapPartitionsToPair( *f* ) The input data has 2 input splits (yarn 2.6.0). myPartitioner partitions all the records on partition 0, which is correct, so the intuition is that f provided to the last transformation (mapPartitionsToPair) would run sequentially inside a single yarn container. However from yarn logs I do see that both yarn containers are processing records from the same partition ... and *sometimes* the over all job fails (due to the code in f which expects a certain order of records) and yarn container 1 receives the records as expected, whereas yarn container 2 receives a subset of records ... for a reason I cannot explain and f fails. The overall behavior of this job is that sometimes it succeeds and sometimes it fails ... apparently due to inconsistent propagation of sorted records to yarn containers. If any of this makes any sense to you, please let me know what I am missing. Best, Marius