Re: Pitfalls of partitioning by host?

2018-08-28 Thread Patrick McCarthy
I'm not 100% sure, but a naive repartition() seems to cause a shuffle. If
this is actually happening, it's just wasteful overhead. The ambition is to
say "divide the data into partitions, but make sure you don't move it in
doing so".



On Tue, Aug 28, 2018 at 2:06 PM, Patrick McCarthy 
wrote:

> I'm not 100% sure, but a naive repartition() seems to cause a shuffle. If
> this is actually happening, it's just wasteful overhead.
>
> On Tue, Aug 28, 2018 at 1:03 PM, Sonal Goyal 
> wrote:
>
>> Hi Patrick,
>>
>> Sorry is there something here that helps you beyond repartition(number of
>> partitons) or calling your udf on foreachPartition? If your data is on
>> disk, Spark is already partitioning it for you by rows. How is adding the
>> host info helping?
>>
>> Thanks,
>> Sonal
>> Nube Technologies 
>>
>> 
>>
>>
>>
>> On Tue, Aug 28, 2018 at 8:29 PM, Patrick McCarthy <
>> pmccar...@dstillery.com.invalid> wrote:
>>
>>> Mostly I'm guessing that it adds efficiency to a job where partitioning
>>> is required but shuffling is not.
>>>
>>> For example, if I want to apply a UDF to 1tb of records on disk, I might
>>> need to repartition(5) to get the task size down to an acceptable size
>>> for my cluster. If I don't care that it's totally balanced, then I'd hope
>>> that I could save a lot of overhead with
>>>
>>> foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
>>> 'randkey','host').apply(udf)
>>>
>>>
>>> On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz 
>>> wrote:
>>>
 Well if we think of shuffling as a necessity to perform an operation,
 then the problem would be that you are adding a ln aggregation stage to a
 job that is going to get shuffled anyway.  Like if you need to join two
 datasets, then Spark will still shuffle the data, whether they are grouped
 by hostname prior to that or not.  My question is, is there anything else
 that you would expect to gain, except for enforcing maybe a dataset that is
 already bucketed? Like you could enforce that data is where it is supposed
 to be, but what else would you avoid?

 Sent from my iPhone

 > On Aug 27, 2018, at 12:22 PM, Patrick McCarthy <
 pmccar...@dstillery.com.INVALID> wrote:
 >
 > When debugging some behavior on my YARN cluster I wrote the following
 PySpark UDF to figure out what host was operating on what row of data:
 >
 > @F.udf(T.StringType())
 > def add_hostname(x):
 >
 > import socket
 >
 > return str(socket.gethostname())
 >
 > It occurred to me that I could use this to enforce node-locality for
 other operations:
 >
 > df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
 >
 > When working on a big job without obvious partition keys, this seems
 like a very straightforward way to avoid a shuffle, but it seems too easy.
 >
 > What problems would I introduce by trying to partition on hostname
 like this?

 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org


>>>
>>
>


Re: Pitfalls of partitioning by host?

2018-08-28 Thread Patrick McCarthy
I'm not 100% sure, but a naive repartition() seems to cause a shuffle. If
this is actually happening, it's just wasteful overhead.

On Tue, Aug 28, 2018 at 1:03 PM, Sonal Goyal  wrote:

> Hi Patrick,
>
> Sorry is there something here that helps you beyond repartition(number of
> partitons) or calling your udf on foreachPartition? If your data is on
> disk, Spark is already partitioning it for you by rows. How is adding the
> host info helping?
>
> Thanks,
> Sonal
> Nube Technologies 
>
> 
>
>
>
> On Tue, Aug 28, 2018 at 8:29 PM, Patrick McCarthy <
> pmccar...@dstillery.com.invalid> wrote:
>
>> Mostly I'm guessing that it adds efficiency to a job where partitioning
>> is required but shuffling is not.
>>
>> For example, if I want to apply a UDF to 1tb of records on disk, I might
>> need to repartition(5) to get the task size down to an acceptable size
>> for my cluster. If I don't care that it's totally balanced, then I'd hope
>> that I could save a lot of overhead with
>>
>> foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
>> 'randkey','host').apply(udf)
>>
>>
>> On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz 
>> wrote:
>>
>>> Well if we think of shuffling as a necessity to perform an operation,
>>> then the problem would be that you are adding a ln aggregation stage to a
>>> job that is going to get shuffled anyway.  Like if you need to join two
>>> datasets, then Spark will still shuffle the data, whether they are grouped
>>> by hostname prior to that or not.  My question is, is there anything else
>>> that you would expect to gain, except for enforcing maybe a dataset that is
>>> already bucketed? Like you could enforce that data is where it is supposed
>>> to be, but what else would you avoid?
>>>
>>> Sent from my iPhone
>>>
>>> > On Aug 27, 2018, at 12:22 PM, Patrick McCarthy <
>>> pmccar...@dstillery.com.INVALID> wrote:
>>> >
>>> > When debugging some behavior on my YARN cluster I wrote the following
>>> PySpark UDF to figure out what host was operating on what row of data:
>>> >
>>> > @F.udf(T.StringType())
>>> > def add_hostname(x):
>>> >
>>> > import socket
>>> >
>>> > return str(socket.gethostname())
>>> >
>>> > It occurred to me that I could use this to enforce node-locality for
>>> other operations:
>>> >
>>> > df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
>>> >
>>> > When working on a big job without obvious partition keys, this seems
>>> like a very straightforward way to avoid a shuffle, but it seems too easy.
>>> >
>>> > What problems would I introduce by trying to partition on hostname
>>> like this?
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>


Re: [External Sender] Pitfalls of partitioning by host?

2018-08-28 Thread Jayesh Lalwani
If you group by the host that you have computed using the UDF, Spark is
always going to shuffle your dataset, even if the end result is that all
the new partitions look exactly like the old partitions, just placed on
differrent nodes. Remember the hostname will probably hash differrently
than the partition key of the data.

Let's say, you are trying to do is read a file, apply a UDF, and write out
to file. Without your "performance improvement", Spark will read partitions
, apply the UDF to the rows in the partitions, and write the rows out..
With your upgrade, it will read the partitions, apply the hostname udf,
shuffle by host name, apply the UDF on the shuffled rows, and write the
data out

If your intent is to increase efficiency, this will do the opposite of what
you are trying to do

On Mon, Aug 27, 2018 at 1:23 PM Patrick McCarthy
 wrote:

> When debugging some behavior on my YARN cluster I wrote the following
> PySpark UDF to figure out what host was operating on what row of data:
>
> @F.udf(T.StringType())
> def add_hostname(x):
>
> import socket
>
> return str(socket.gethostname())
>
> It occurred to me that I could use this to enforce node-locality for other
> operations:
>
> df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
>
> When working on a big job without obvious partition keys, this seems like
> a very straightforward way to avoid a shuffle, but it seems too easy.
>
> What problems would I introduce by trying to partition on hostname like
> this?
>


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Pitfalls of partitioning by host?

2018-08-28 Thread Sonal Goyal
Hi Patrick,

Sorry is there something here that helps you beyond repartition(number of
partitons) or calling your udf on foreachPartition? If your data is on
disk, Spark is already partitioning it for you by rows. How is adding the
host info helping?

Thanks,
Sonal
Nube Technologies 





On Tue, Aug 28, 2018 at 8:29 PM, Patrick McCarthy <
pmccar...@dstillery.com.invalid> wrote:

> Mostly I'm guessing that it adds efficiency to a job where partitioning is
> required but shuffling is not.
>
> For example, if I want to apply a UDF to 1tb of records on disk, I might
> need to repartition(5) to get the task size down to an acceptable size
> for my cluster. If I don't care that it's totally balanced, then I'd hope
> that I could save a lot of overhead with
>
> foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
> 'randkey','host').apply(udf)
>
>
> On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz 
> wrote:
>
>> Well if we think of shuffling as a necessity to perform an operation,
>> then the problem would be that you are adding a ln aggregation stage to a
>> job that is going to get shuffled anyway.  Like if you need to join two
>> datasets, then Spark will still shuffle the data, whether they are grouped
>> by hostname prior to that or not.  My question is, is there anything else
>> that you would expect to gain, except for enforcing maybe a dataset that is
>> already bucketed? Like you could enforce that data is where it is supposed
>> to be, but what else would you avoid?
>>
>> Sent from my iPhone
>>
>> > On Aug 27, 2018, at 12:22 PM, Patrick McCarthy > .INVALID> wrote:
>> >
>> > When debugging some behavior on my YARN cluster I wrote the following
>> PySpark UDF to figure out what host was operating on what row of data:
>> >
>> > @F.udf(T.StringType())
>> > def add_hostname(x):
>> >
>> > import socket
>> >
>> > return str(socket.gethostname())
>> >
>> > It occurred to me that I could use this to enforce node-locality for
>> other operations:
>> >
>> > df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
>> >
>> > When working on a big job without obvious partition keys, this seems
>> like a very straightforward way to avoid a shuffle, but it seems too easy.
>> >
>> > What problems would I introduce by trying to partition on hostname like
>> this?
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: Pitfalls of partitioning by host?

2018-08-28 Thread Patrick McCarthy
Mostly I'm guessing that it adds efficiency to a job where partitioning is
required but shuffling is not.

For example, if I want to apply a UDF to 1tb of records on disk, I might
need to repartition(5) to get the task size down to an acceptable size
for my cluster. If I don't care that it's totally balanced, then I'd hope
that I could save a lot of overhead with

foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
'randkey','host').apply(udf)


On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz 
wrote:

> Well if we think of shuffling as a necessity to perform an operation, then
> the problem would be that you are adding a ln aggregation stage to a job
> that is going to get shuffled anyway.  Like if you need to join two
> datasets, then Spark will still shuffle the data, whether they are grouped
> by hostname prior to that or not.  My question is, is there anything else
> that you would expect to gain, except for enforcing maybe a dataset that is
> already bucketed? Like you could enforce that data is where it is supposed
> to be, but what else would you avoid?
>
> Sent from my iPhone
>
> > On Aug 27, 2018, at 12:22 PM, Patrick McCarthy 
> > 
> wrote:
> >
> > When debugging some behavior on my YARN cluster I wrote the following
> PySpark UDF to figure out what host was operating on what row of data:
> >
> > @F.udf(T.StringType())
> > def add_hostname(x):
> >
> > import socket
> >
> > return str(socket.gethostname())
> >
> > It occurred to me that I could use this to enforce node-locality for
> other operations:
> >
> > df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
> >
> > When working on a big job without obvious partition keys, this seems
> like a very straightforward way to avoid a shuffle, but it seems too easy.
> >
> > What problems would I introduce by trying to partition on hostname like
> this?
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Pitfalls of partitioning by host?

2018-08-28 Thread Patrick McCarthy
Mostly I'm guessing that it adds efficiency to a job where partitioning is
required but shuffling is not.

For example, if I want to apply a UDF to 1tb of records on disk, I might
need to repartition(5) to get the task size down to an acceptable size
for my cluster. If I don't care that it's totally balanced, then I'd hope
that I could save a lot of overhead with

foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
'randkey','host').apply(udf)


On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz 
wrote:

> Well if we think of shuffling as a necessity to perform an operation, then
> the problem would be that you are adding a ln aggregation stage to a job
> that is going to get shuffled anyway.  Like if you need to join two
> datasets, then Spark will still shuffle the data, whether they are grouped
> by hostname prior to that or not.  My question is, is there anything else
> that you would expect to gain, except for enforcing maybe a dataset that is
> already bucketed? Like you could enforce that data is where it is supposed
> to be, but what else would you avoid?
>
> Sent from my iPhone
>
> > On Aug 27, 2018, at 12:22 PM, Patrick McCarthy 
> > 
> wrote:
> >
> > When debugging some behavior on my YARN cluster I wrote the following
> PySpark UDF to figure out what host was operating on what row of data:
> >
> > @F.udf(T.StringType())
> > def add_hostname(x):
> >
> > import socket
> >
> > return str(socket.gethostname())
> >
> > It occurred to me that I could use this to enforce node-locality for
> other operations:
> >
> > df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
> >
> > When working on a big job without obvious partition keys, this seems
> like a very straightforward way to avoid a shuffle, but it seems too easy.
> >
> > What problems would I introduce by trying to partition on hostname like
> this?
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Pitfalls of partitioning by host?

2018-08-28 Thread Patrick McCarthy
Mostly I'm guessing that it adds efficiency to a job where partitioning is
required but shuffling is not.

For example, if I want to apply a UDF to 1tb of records on disk, I might
need to repartition(5) to get the task size down to an acceptable size
for my cluster. If I don't care that it's totally balanced, then I'd hope
that I could save a lot of overhead with

foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
'randkey','host').apply(udf)


On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz 
wrote:

> Well if we think of shuffling as a necessity to perform an operation, then
> the problem would be that you are adding a ln aggregation stage to a job
> that is going to get shuffled anyway.  Like if you need to join two
> datasets, then Spark will still shuffle the data, whether they are grouped
> by hostname prior to that or not.  My question is, is there anything else
> that you would expect to gain, except for enforcing maybe a dataset that is
> already bucketed? Like you could enforce that data is where it is supposed
> to be, but what else would you avoid?
>
> Sent from my iPhone
>
> > On Aug 27, 2018, at 12:22 PM, Patrick McCarthy 
> > 
> wrote:
> >
> > When debugging some behavior on my YARN cluster I wrote the following
> PySpark UDF to figure out what host was operating on what row of data:
> >
> > @F.udf(T.StringType())
> > def add_hostname(x):
> >
> > import socket
> >
> > return str(socket.gethostname())
> >
> > It occurred to me that I could use this to enforce node-locality for
> other operations:
> >
> > df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
> >
> > When working on a big job without obvious partition keys, this seems
> like a very straightforward way to avoid a shuffle, but it seems too easy.
> >
> > What problems would I introduce by trying to partition on hostname like
> this?
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Pitfalls of partitioning by host?

2018-08-28 Thread Patrick McCarthy
Mostly I'm guessing that it adds efficiency to a job where partitioning is
required but shuffling is not.

For example, if I want to apply a UDF to 1tb of records on disk, I might
need to repartition(5) to get the task size down to an acceptable size
for my cluster. If I don't care that it's totally balanced, then I'd hope
that I could save a lot of overhead with

foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
'randkey','host').apply(udf)


On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz 
wrote:

> Well if we think of shuffling as a necessity to perform an operation, then
> the problem would be that you are adding a ln aggregation stage to a job
> that is going to get shuffled anyway.  Like if you need to join two
> datasets, then Spark will still shuffle the data, whether they are grouped
> by hostname prior to that or not.  My question is, is there anything else
> that you would expect to gain, except for enforcing maybe a dataset that is
> already bucketed? Like you could enforce that data is where it is supposed
> to be, but what else would you avoid?
>
> Sent from my iPhone
>
> > On Aug 27, 2018, at 12:22 PM, Patrick McCarthy 
> > 
> wrote:
> >
> > When debugging some behavior on my YARN cluster I wrote the following
> PySpark UDF to figure out what host was operating on what row of data:
> >
> > @F.udf(T.StringType())
> > def add_hostname(x):
> >
> > import socket
> >
> > return str(socket.gethostname())
> >
> > It occurred to me that I could use this to enforce node-locality for
> other operations:
> >
> > df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
> >
> > When working on a big job without obvious partition keys, this seems
> like a very straightforward way to avoid a shuffle, but it seems too easy.
> >
> > What problems would I introduce by trying to partition on hostname like
> this?
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Pitfalls of partitioning by host?

2018-08-28 Thread Patrick McCarthy
Mostly I'm guessing that it adds efficiency to a job where partitioning is
required but shuffling is not.

For example, if I want to apply a UDF to 1tb of records on disk, I might
need to repartition(5) to get the task size down to an acceptable size
for my cluster. If I don't care that it's totally balanced, then I'd hope
that I could save a lot of overhead with

foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
'randkey','host').apply(udf)


On Tue, Aug 28, 2018 at 10:28 AM, Patrick McCarthy 
wrote:

> Mostly I'm guessing that it adds efficiency to a job where partitioning is
> required but shuffling is not.
>
> For example, if I want to apply a UDF to 1tb of records on disk, I might
> need to repartition(5) to get the task size down to an acceptable size
> for my cluster. If I don't care that it's totally balanced, then I'd hope
> that I could save a lot of overhead with
>
> foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
> 'randkey','host').apply(udf)
>
> On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz 
> wrote:
>
>> Well if we think of shuffling as a necessity to perform an operation,
>> then the problem would be that you are adding a ln aggregation stage to a
>> job that is going to get shuffled anyway.  Like if you need to join two
>> datasets, then Spark will still shuffle the data, whether they are grouped
>> by hostname prior to that or not.  My question is, is there anything else
>> that you would expect to gain, except for enforcing maybe a dataset that is
>> already bucketed? Like you could enforce that data is where it is supposed
>> to be, but what else would you avoid?
>>
>> Sent from my iPhone
>>
>> > On Aug 27, 2018, at 12:22 PM, Patrick McCarthy > .INVALID> wrote:
>> >
>> > When debugging some behavior on my YARN cluster I wrote the following
>> PySpark UDF to figure out what host was operating on what row of data:
>> >
>> > @F.udf(T.StringType())
>> > def add_hostname(x):
>> >
>> > import socket
>> >
>> > return str(socket.gethostname())
>> >
>> > It occurred to me that I could use this to enforce node-locality for
>> other operations:
>> >
>> > df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
>> >
>> > When working on a big job without obvious partition keys, this seems
>> like a very straightforward way to avoid a shuffle, but it seems too easy.
>> >
>> > What problems would I introduce by trying to partition on hostname like
>> this?
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: Pitfalls of partitioning by host?

2018-08-28 Thread Patrick McCarthy
Mostly I'm guessing that it adds efficiency to a job where partitioning is
required but shuffling is not.

For example, if I want to apply a UDF to 1tb of records on disk, I might
need to repartition(5) to get the task size down to an acceptable size
for my cluster. If I don't care that it's totally balanced, then I'd hope
that I could save a lot of overhead with

foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
'randkey','host').apply(udf)

On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz 
wrote:

> Well if we think of shuffling as a necessity to perform an operation, then
> the problem would be that you are adding a ln aggregation stage to a job
> that is going to get shuffled anyway.  Like if you need to join two
> datasets, then Spark will still shuffle the data, whether they are grouped
> by hostname prior to that or not.  My question is, is there anything else
> that you would expect to gain, except for enforcing maybe a dataset that is
> already bucketed? Like you could enforce that data is where it is supposed
> to be, but what else would you avoid?
>
> Sent from my iPhone
>
> > On Aug 27, 2018, at 12:22 PM, Patrick McCarthy 
> > 
> wrote:
> >
> > When debugging some behavior on my YARN cluster I wrote the following
> PySpark UDF to figure out what host was operating on what row of data:
> >
> > @F.udf(T.StringType())
> > def add_hostname(x):
> >
> > import socket
> >
> > return str(socket.gethostname())
> >
> > It occurred to me that I could use this to enforce node-locality for
> other operations:
> >
> > df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
> >
> > When working on a big job without obvious partition keys, this seems
> like a very straightforward way to avoid a shuffle, but it seems too easy.
> >
> > What problems would I introduce by trying to partition on hostname like
> this?
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Pitfalls of partitioning by host?

2018-08-27 Thread Michael Artz
Well if we think of shuffling as a necessity to perform an operation, then the 
problem would be that you are adding a ln aggregation stage to a job that is 
going to get shuffled anyway.  Like if you need to join two datasets, then 
Spark will still shuffle the data, whether they are grouped by hostname prior 
to that or not.  My question is, is there anything else that you would expect 
to gain, except for enforcing maybe a dataset that is already bucketed? Like 
you could enforce that data is where it is supposed to be, but what else would 
you avoid? 

Sent from my iPhone

> On Aug 27, 2018, at 12:22 PM, Patrick McCarthy 
>  wrote:
> 
> When debugging some behavior on my YARN cluster I wrote the following PySpark 
> UDF to figure out what host was operating on what row of data:
> 
> @F.udf(T.StringType())
> def add_hostname(x):
> 
> import socket
> 
> return str(socket.gethostname())
> 
> It occurred to me that I could use this to enforce node-locality for other 
> operations:
> 
> df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
> 
> When working on a big job without obvious partition keys, this seems like a 
> very straightforward way to avoid a shuffle, but it seems too easy. 
> 
> What problems would I introduce by trying to partition on hostname like this?

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Pitfalls of partitioning by host?

2018-08-27 Thread Patrick McCarthy
When debugging some behavior on my YARN cluster I wrote the following
PySpark UDF to figure out what host was operating on what row of data:

@F.udf(T.StringType())
def add_hostname(x):

import socket

return str(socket.gethostname())

It occurred to me that I could use this to enforce node-locality for other
operations:

df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)

When working on a big job without obvious partition keys, this seems like a
very straightforward way to avoid a shuffle, but it seems too easy.

What problems would I introduce by trying to partition on hostname like
this?