I'm not 100% sure, but a naive repartition() seems to cause a shuffle. If this is actually happening, it's just wasteful overhead.
On Tue, Aug 28, 2018 at 1:03 PM, Sonal Goyal <sonalgoy...@gmail.com> wrote: > Hi Patrick, > > Sorry is there something here that helps you beyond repartition(number of > partitons) or calling your udf on foreachPartition? If your data is on > disk, Spark is already partitioning it for you by rows. How is adding the > host info helping? > > Thanks, > Sonal > Nube Technologies <http://www.nubetech.co> > > <http://in.linkedin.com/in/sonalgoyal> > > > > On Tue, Aug 28, 2018 at 8:29 PM, Patrick McCarthy < > pmccar...@dstillery.com.invalid> wrote: > >> Mostly I'm guessing that it adds efficiency to a job where partitioning >> is required but shuffling is not. >> >> For example, if I want to apply a UDF to 1tb of records on disk, I might >> need to repartition(50000) to get the task size down to an acceptable size >> for my cluster. If I don't care that it's totally balanced, then I'd hope >> that I could save a lot of overhead with >> >> foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000, >> 'randkey','host').apply(udf) >> >> >> On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz <michaelea...@gmail.com> >> wrote: >> >>> Well if we think of shuffling as a necessity to perform an operation, >>> then the problem would be that you are adding a ln aggregation stage to a >>> job that is going to get shuffled anyway. Like if you need to join two >>> datasets, then Spark will still shuffle the data, whether they are grouped >>> by hostname prior to that or not. My question is, is there anything else >>> that you would expect to gain, except for enforcing maybe a dataset that is >>> already bucketed? Like you could enforce that data is where it is supposed >>> to be, but what else would you avoid? >>> >>> Sent from my iPhone >>> >>> > On Aug 27, 2018, at 12:22 PM, Patrick McCarthy < >>> pmccar...@dstillery.com.INVALID> wrote: >>> > >>> > When debugging some behavior on my YARN cluster I wrote the following >>> PySpark UDF to figure out what host was operating on what row of data: >>> > >>> > @F.udf(T.StringType()) >>> > def add_hostname(x): >>> > >>> > import socket >>> > >>> > return str(socket.gethostname()) >>> > >>> > It occurred to me that I could use this to enforce node-locality for >>> other operations: >>> > >>> > df.withColumn('host', add_hostname(x)).groupBy('host').apply(...) >>> > >>> > When working on a big job without obvious partition keys, this seems >>> like a very straightforward way to avoid a shuffle, but it seems too easy. >>> > >>> > What problems would I introduce by trying to partition on hostname >>> like this? >>> >>> --------------------------------------------------------------------- >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>> >>> >> >