Hi,
I do not think we need to do any of that. Please try repartitionbyrange,
dpark 3 has adaptive query execution with configurations to handle skew as
well.

Regards,
Gourav

On Tue, Jan 11, 2022 at 4:21 PM Andrew Davidson <aedav...@ucsc.edu> wrote:

> HI Gourav
>
>
>
> When I join I get OOM. To address this my thought was to split my tables
> into small batches of rows. And then join the batch together then use
> union. My assumption is the union is a narrow transform and as such require
> fewer resources. Let say I have 5 data frames I want to join together and
> each has 300 rows
>
>
>
> I want to create 15 data frames.
>
>
>
> Set1 = {11, 12, 13, 14, 15}
>
>
>
> Set2 = {21, 22, 23, 24, 25}
>
>
>
> Set3 = {31, 32, 33, 34, 35)
>
>
>
> The joined the “batch
>
> S1joinDF = 11.join(12).join(13).join(14).join(15)
>
>
>
> S2joinDF = 21.join(22).join(23).join(24).join(25)
>
>
>
> S3joinDF = 31.join(32).join(33).join(34).join(35)
>
>
>
> resultDF = S1joinDF.union( S2joinDF ) .union( S3joinDF )
>
>
>
> The I originally wrote my code is as follows. Based on my unit test it
> turns out I need to call orderBy on every iteration of the for loop. I
> tried sorting outside of the while loop, did not resolve problem Given the
> size of my dataframes that is going crush performance. My unit test works.
> I never ran it on my real data set.
>
>
>
>     # Create a copy of original *dataframe*
>
>         copyDF = df.orderBy("Name")
>
>         # copyDF.show()
>
>
>
>         i = 0
>
>         while i < numberOfSplits:
>
>             self.logger.warn("i:{}".format(i))
>
>             # Get the top `numRows` number of rows
>
>             # note take() is an action
>
>             # limit() is a transformation
>
>             topDF = copyDF.limit( numRows )
>
>
>
>             # Truncate the `copy_df` to remove
>
>             # the contents fetched for `temp_df`
>
>             # original quant.sf files are sorted by name however
>
>             # we must use order by, else the row names between
>
>             # GTEx sample will not be the same
>
>             # we can not simply sort or orderBy once. we have to
>
>             # do this on every iteration
>
>             copyDF = copyDF.subtract(topDF).orderBy( "Name" )
>
>
>
>             retList[i] = topDF
>
>
>
>             # Increment the split number
>
>             i += 1
>
>
>
>         if remainingRows > 0 :
>
>             self.logger.info("AEDWIP writing last i:{} 
> *len*(retList):{}".format(i,
> len(retList)))
>
>             retList[i] = copyDF
>
>             #copyDF.show()
>
>             #retList[i].show()
>
>
>
>
>
> okay so that the background. Rather than use order by. I thought if I
> could add a row number I could easily split up mydata frames. My code would
> look a lot like what I would write in pandas or R
>
>
>
> *while* i *<* numBatches:
>
>     start *=* i *** numRows
>
>     end *=* start *+* numRows
>
>     print("\ni:{} start:{} end:{}"*.*format(i, start,end))
>
>     df *=* trainDF*.*iloc[ start:end ]
>
>
>
> There does not seem to be an easy way to do this.
>
>
> https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.monotonically_increasing_id.html
>
> The generated ID is guaranteed to be monotonically increasing and unique,
> but not consecutive.
>
>
>
>
>
> Comments and suggestions appreciated
>
>
>
> Andy
>
>
>
>
>
> *From: *Gourav Sengupta <gourav.sengu...@gmail.com>
> *Date: *Monday, January 10, 2022 at 11:03 AM
> *To: *Andrew Davidson <aedav...@ucsc.edu.invalid>
> *Cc: *"user @spark" <user@spark.apache.org>
> *Subject: *Re: How to add a row number column with out reordering my data
> frame
>
>
>
> Hi,
>
>
>
> I am a bit confused here, it is not entirely clear to me why are you
> creating the row numbers, and how creating the row numbers helps you with
> the joins?
>
>
>
> Can you please explain with some sample data?
>
>
>
>
>
> Regards,
>
> Gourav
>
>
>
> On Fri, Jan 7, 2022 at 1:14 AM Andrew Davidson <aedav...@ucsc.edu.invalid>
> wrote:
>
> Hi
>
>
>
> I am trying to work through a OOM error. I have 10411 files. I want to
> select a single column from each file and then join them into a single
> table.
>
>
>
> The files have a row unique id. However it is a very long string. The data
> file with just the name and column of interest is about 470 M. The column
> of interest alone is 21 m. it is a column over 5 million real numbers.
>
>
>
> So I thought I would save a lot of memory if I can join over row numbers.
>
>
>
> # create *dummy* variable to *orderby*
> https://www.py4u.net/discuss/1840945
>
>             w = Window().orderBy(lit('A'))
>
>             sampleDF = sampleDF.select( ["NumReads"] )\
>
>                         .withColumnRenamed( "NumReads", sampleName )\
>
>                         .withColumn( "*tid*",row_number().over(w) )
>
>
>
>
>
> This code seem pretty complicated as someone coming from pandas an R
> dataframes. My unit test works however it generates the following warning.
>
>
>
>
>
> WARN WindowExec: No Partition Defined for Window operation! Moving all
> data to a single partition, this can cause serious performance degradation.
>
>
>
>
>
> Is there a better  way to create a row number with our reordering my data?
> The order is important
>
>
>
> Kind regards
>
>
>
> Andy
>
>

Reply via email to