I'm not 100% sure, but a naive repartition() seems to cause a shuffle. If
this is actually happening, it's just wasteful overhead.

On Tue, Aug 28, 2018 at 1:03 PM, Sonal Goyal <sonalgoy...@gmail.com> wrote:

> Hi Patrick,
>
> Sorry is there something here that helps you beyond repartition(number of
> partitons) or calling your udf on foreachPartition? If your data is on
> disk, Spark is already partitioning it for you by rows. How is adding the
> host info helping?
>
> Thanks,
> Sonal
> Nube Technologies <http://www.nubetech.co>
>
> <http://in.linkedin.com/in/sonalgoyal>
>
>
>
> On Tue, Aug 28, 2018 at 8:29 PM, Patrick McCarthy <
> pmccar...@dstillery.com.invalid> wrote:
>
>> Mostly I'm guessing that it adds efficiency to a job where partitioning
>> is required but shuffling is not.
>>
>> For example, if I want to apply a UDF to 1tb of records on disk, I might
>> need to repartition(50000) to get the task size down to an acceptable size
>> for my cluster. If I don't care that it's totally balanced, then I'd hope
>> that I could save a lot of overhead with
>>
>> foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000,
>> 'randkey','host').apply(udf)
>>
>>
>> On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz <michaelea...@gmail.com>
>> wrote:
>>
>>> Well if we think of shuffling as a necessity to perform an operation,
>>> then the problem would be that you are adding a ln aggregation stage to a
>>> job that is going to get shuffled anyway.  Like if you need to join two
>>> datasets, then Spark will still shuffle the data, whether they are grouped
>>> by hostname prior to that or not.  My question is, is there anything else
>>> that you would expect to gain, except for enforcing maybe a dataset that is
>>> already bucketed? Like you could enforce that data is where it is supposed
>>> to be, but what else would you avoid?
>>>
>>> Sent from my iPhone
>>>
>>> > On Aug 27, 2018, at 12:22 PM, Patrick McCarthy <
>>> pmccar...@dstillery.com.INVALID> wrote:
>>> >
>>> > When debugging some behavior on my YARN cluster I wrote the following
>>> PySpark UDF to figure out what host was operating on what row of data:
>>> >
>>> > @F.udf(T.StringType())
>>> > def add_hostname(x):
>>> >
>>> >     import socket
>>> >
>>> >     return str(socket.gethostname())
>>> >
>>> > It occurred to me that I could use this to enforce node-locality for
>>> other operations:
>>> >
>>> > df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
>>> >
>>> > When working on a big job without obvious partition keys, this seems
>>> like a very straightforward way to avoid a shuffle, but it seems too easy.
>>> >
>>> > What problems would I introduce by trying to partition on hostname
>>> like this?
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>

Reply via email to