Mostly I'm guessing that it adds efficiency to a job where partitioning is required but shuffling is not.
For example, if I want to apply a UDF to 1tb of records on disk, I might need to repartition(50000) to get the task size down to an acceptable size for my cluster. If I don't care that it's totally balanced, then I'd hope that I could save a lot of overhead with foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000, 'randkey','host').apply(udf) On Tue, Aug 28, 2018 at 10:28 AM, Patrick McCarthy <pmccar...@dstillery.com> wrote: > Mostly I'm guessing that it adds efficiency to a job where partitioning is > required but shuffling is not. > > For example, if I want to apply a UDF to 1tb of records on disk, I might > need to repartition(50000) to get the task size down to an acceptable size > for my cluster. If I don't care that it's totally balanced, then I'd hope > that I could save a lot of overhead with > > foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000, > 'randkey','host').apply(udf) > > On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz <michaelea...@gmail.com> > wrote: > >> Well if we think of shuffling as a necessity to perform an operation, >> then the problem would be that you are adding a ln aggregation stage to a >> job that is going to get shuffled anyway. Like if you need to join two >> datasets, then Spark will still shuffle the data, whether they are grouped >> by hostname prior to that or not. My question is, is there anything else >> that you would expect to gain, except for enforcing maybe a dataset that is >> already bucketed? Like you could enforce that data is where it is supposed >> to be, but what else would you avoid? >> >> Sent from my iPhone >> >> > On Aug 27, 2018, at 12:22 PM, Patrick McCarthy <pmccar...@dstillery.com >> .INVALID> wrote: >> > >> > When debugging some behavior on my YARN cluster I wrote the following >> PySpark UDF to figure out what host was operating on what row of data: >> > >> > @F.udf(T.StringType()) >> > def add_hostname(x): >> > >> > import socket >> > >> > return str(socket.gethostname()) >> > >> > It occurred to me that I could use this to enforce node-locality for >> other operations: >> > >> > df.withColumn('host', add_hostname(x)).groupBy('host').apply(...) >> > >> > When working on a big job without obvious partition keys, this seems >> like a very straightforward way to avoid a shuffle, but it seems too easy. >> > >> > What problems would I introduce by trying to partition on hostname like >> this? >> >> --------------------------------------------------------------------- >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> >