Hi, I do not think we need to do any of that. Please try repartitionbyrange, dpark 3 has adaptive query execution with configurations to handle skew as well.
Regards, Gourav On Tue, Jan 11, 2022 at 4:21 PM Andrew Davidson <aedav...@ucsc.edu> wrote: > HI Gourav > > > > When I join I get OOM. To address this my thought was to split my tables > into small batches of rows. And then join the batch together then use > union. My assumption is the union is a narrow transform and as such require > fewer resources. Let say I have 5 data frames I want to join together and > each has 300 rows > > > > I want to create 15 data frames. > > > > Set1 = {11, 12, 13, 14, 15} > > > > Set2 = {21, 22, 23, 24, 25} > > > > Set3 = {31, 32, 33, 34, 35) > > > > The joined the “batch > > S1joinDF = 11.join(12).join(13).join(14).join(15) > > > > S2joinDF = 21.join(22).join(23).join(24).join(25) > > > > S3joinDF = 31.join(32).join(33).join(34).join(35) > > > > resultDF = S1joinDF.union( S2joinDF ) .union( S3joinDF ) > > > > The I originally wrote my code is as follows. Based on my unit test it > turns out I need to call orderBy on every iteration of the for loop. I > tried sorting outside of the while loop, did not resolve problem Given the > size of my dataframes that is going crush performance. My unit test works. > I never ran it on my real data set. > > > > # Create a copy of original *dataframe* > > copyDF = df.orderBy("Name") > > # copyDF.show() > > > > i = 0 > > while i < numberOfSplits: > > self.logger.warn("i:{}".format(i)) > > # Get the top `numRows` number of rows > > # note take() is an action > > # limit() is a transformation > > topDF = copyDF.limit( numRows ) > > > > # Truncate the `copy_df` to remove > > # the contents fetched for `temp_df` > > # original quant.sf files are sorted by name however > > # we must use order by, else the row names between > > # GTEx sample will not be the same > > # we can not simply sort or orderBy once. we have to > > # do this on every iteration > > copyDF = copyDF.subtract(topDF).orderBy( "Name" ) > > > > retList[i] = topDF > > > > # Increment the split number > > i += 1 > > > > if remainingRows > 0 : > > self.logger.info("AEDWIP writing last i:{} > *len*(retList):{}".format(i, > len(retList))) > > retList[i] = copyDF > > #copyDF.show() > > #retList[i].show() > > > > > > okay so that the background. Rather than use order by. I thought if I > could add a row number I could easily split up mydata frames. My code would > look a lot like what I would write in pandas or R > > > > *while* i *<* numBatches: > > start *=* i *** numRows > > end *=* start *+* numRows > > print("\ni:{} start:{} end:{}"*.*format(i, start,end)) > > df *=* trainDF*.*iloc[ start:end ] > > > > There does not seem to be an easy way to do this. > > > https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.monotonically_increasing_id.html > > The generated ID is guaranteed to be monotonically increasing and unique, > but not consecutive. > > > > > > Comments and suggestions appreciated > > > > Andy > > > > > > *From: *Gourav Sengupta <gourav.sengu...@gmail.com> > *Date: *Monday, January 10, 2022 at 11:03 AM > *To: *Andrew Davidson <aedav...@ucsc.edu.invalid> > *Cc: *"user @spark" <user@spark.apache.org> > *Subject: *Re: How to add a row number column with out reordering my data > frame > > > > Hi, > > > > I am a bit confused here, it is not entirely clear to me why are you > creating the row numbers, and how creating the row numbers helps you with > the joins? > > > > Can you please explain with some sample data? > > > > > > Regards, > > Gourav > > > > On Fri, Jan 7, 2022 at 1:14 AM Andrew Davidson <aedav...@ucsc.edu.invalid> > wrote: > > Hi > > > > I am trying to work through a OOM error. I have 10411 files. I want to > select a single column from each file and then join them into a single > table. > > > > The files have a row unique id. However it is a very long string. The data > file with just the name and column of interest is about 470 M. The column > of interest alone is 21 m. it is a column over 5 million real numbers. > > > > So I thought I would save a lot of memory if I can join over row numbers. > > > > # create *dummy* variable to *orderby* > https://www.py4u.net/discuss/1840945 > > w = Window().orderBy(lit('A')) > > sampleDF = sampleDF.select( ["NumReads"] )\ > > .withColumnRenamed( "NumReads", sampleName )\ > > .withColumn( "*tid*",row_number().over(w) ) > > > > > > This code seem pretty complicated as someone coming from pandas an R > dataframes. My unit test works however it generates the following warning. > > > > > > WARN WindowExec: No Partition Defined for Window operation! Moving all > data to a single partition, this can cause serious performance degradation. > > > > > > Is there a better way to create a row number with our reordering my data? > The order is important > > > > Kind regards > > > > Andy > >