Re: How to add a row number column with out reordering my data frame
Thanks! I will take a look Andy From: Gourav Sengupta Date: Tuesday, January 11, 2022 at 8:42 AM To: Andrew Davidson Cc: Andrew Davidson , "user @spark" Subject: Re: How to add a row number column with out reordering my data frame Hi, I do not think we need to do any of that. Please try repartitionbyrange, dpark 3 has adaptive query execution with configurations to handle skew as well. Regards, Gourav On Tue, Jan 11, 2022 at 4:21 PM Andrew Davidson mailto:aedav...@ucsc.edu>> wrote: HI Gourav When I join I get OOM. To address this my thought was to split my tables into small batches of rows. And then join the batch together then use union. My assumption is the union is a narrow transform and as such require fewer resources. Let say I have 5 data frames I want to join together and each has 300 rows I want to create 15 data frames. Set1 = {11, 12, 13, 14, 15} Set2 = {21, 22, 23, 24, 25} Set3 = {31, 32, 33, 34, 35) The joined the “batch S1joinDF = 11.join(12).join(13).join(14).join(15) S2joinDF = 21.join(22).join(23).join(24).join(25) S3joinDF = 31.join(32).join(33).join(34).join(35) resultDF = S1joinDF.union( S2joinDF ) .union( S3joinDF ) The I originally wrote my code is as follows. Based on my unit test it turns out I need to call orderBy on every iteration of the for loop. I tried sorting outside of the while loop, did not resolve problem Given the size of my dataframes that is going crush performance. My unit test works. I never ran it on my real data set. # Create a copy of original dataframe copyDF = df.orderBy("Name") # copyDF.show() i = 0 while i < numberOfSplits: self.logger.warn("i:{}".format(i)) # Get the top `numRows` number of rows # note take() is an action # limit() is a transformation topDF = copyDF.limit( numRows ) # Truncate the `copy_df` to remove # the contents fetched for `temp_df` # original quant.sf files are sorted by name however # we must use order by, else the row names between # GTEx sample will not be the same # we can not simply sort or orderBy once. we have to # do this on every iteration copyDF = copyDF.subtract(topDF).orderBy( "Name" ) retList[i] = topDF # Increment the split number i += 1 if remainingRows > 0 : self.logger.info<http://self.logger.info>("AEDWIP writing last i:{} len(retList):{}".format(i, len(retList))) retList[i] = copyDF #copyDF.show() #retList[i].show() okay so that the background. Rather than use order by. I thought if I could add a row number I could easily split up mydata frames. My code would look a lot like what I would write in pandas or R while i < numBatches: start = i * numRows end = start + numRows print("\ni:{} start:{} end:{}".format(i, start,end)) df = trainDF.iloc[ start:end ] There does not seem to be an easy way to do this. https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.monotonically_increasing_id.html The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. Comments and suggestions appreciated Andy From: Gourav Sengupta mailto:gourav.sengu...@gmail.com>> Date: Monday, January 10, 2022 at 11:03 AM To: Andrew Davidson Cc: "user @spark" mailto:user@spark.apache.org>> Subject: Re: How to add a row number column with out reordering my data frame Hi, I am a bit confused here, it is not entirely clear to me why are you creating the row numbers, and how creating the row numbers helps you with the joins? Can you please explain with some sample data? Regards, Gourav On Fri, Jan 7, 2022 at 1:14 AM Andrew Davidson wrote: Hi I am trying to work through a OOM error. I have 10411 files. I want to select a single column from each file and then join them into a single table. The files have a row unique id. However it is a very long string. The data file with just the name and column of interest is about 470 M. The column of interest alone is 21 m. it is a column over 5 million real numbers. So I thought I would save a lot of memory if I can join over row numbers. # create dummy variable to orderby https://www.py4u.net/discuss/1840945 w = Window().orderBy(lit('A')) sampleDF = sampleDF.select( ["NumReads"] )\ .withColumnRenamed( "NumReads", sampleName )\ .withColumn( "tid",row_number().over(w) ) This code seem pretty complicated as someone coming from pandas an R dataframes. My unit test works however it generates the following warning. WARN WindowExec: No Partition Defined for Window operation! Moving all d
Re: How to add a row number column with out reordering my data frame
Hi, I do not think we need to do any of that. Please try repartitionbyrange, dpark 3 has adaptive query execution with configurations to handle skew as well. Regards, Gourav On Tue, Jan 11, 2022 at 4:21 PM Andrew Davidson wrote: > HI Gourav > > > > When I join I get OOM. To address this my thought was to split my tables > into small batches of rows. And then join the batch together then use > union. My assumption is the union is a narrow transform and as such require > fewer resources. Let say I have 5 data frames I want to join together and > each has 300 rows > > > > I want to create 15 data frames. > > > > Set1 = {11, 12, 13, 14, 15} > > > > Set2 = {21, 22, 23, 24, 25} > > > > Set3 = {31, 32, 33, 34, 35) > > > > The joined the “batch > > S1joinDF = 11.join(12).join(13).join(14).join(15) > > > > S2joinDF = 21.join(22).join(23).join(24).join(25) > > > > S3joinDF = 31.join(32).join(33).join(34).join(35) > > > > resultDF = S1joinDF.union( S2joinDF ) .union( S3joinDF ) > > > > The I originally wrote my code is as follows. Based on my unit test it > turns out I need to call orderBy on every iteration of the for loop. I > tried sorting outside of the while loop, did not resolve problem Given the > size of my dataframes that is going crush performance. My unit test works. > I never ran it on my real data set. > > > > # Create a copy of original *dataframe* > > copyDF = df.orderBy("Name") > > # copyDF.show() > > > > i = 0 > > while i < numberOfSplits: > > self.logger.warn("i:{}".format(i)) > > # Get the top `numRows` number of rows > > # note take() is an action > > # limit() is a transformation > > topDF = copyDF.limit( numRows ) > > > > # Truncate the `copy_df` to remove > > # the contents fetched for `temp_df` > > # original quant.sf files are sorted by name however > > # we must use order by, else the row names between > > # GTEx sample will not be the same > > # we can not simply sort or orderBy once. we have to > > # do this on every iteration > > copyDF = copyDF.subtract(topDF).orderBy( "Name" ) > > > > retList[i] = topDF > > > > # Increment the split number > > i += 1 > > > > if remainingRows > 0 : > > self.logger.info("AEDWIP writing last i:{} > *len*(retList):{}".format(i, > len(retList))) > > retList[i] = copyDF > > #copyDF.show() > > #retList[i].show() > > > > > > okay so that the background. Rather than use order by. I thought if I > could add a row number I could easily split up mydata frames. My code would > look a lot like what I would write in pandas or R > > > > *while* i *<* numBatches: > > start *=* i *** numRows > > end *=* start *+* numRows > > print("\ni:{} start:{} end:{}"*.*format(i, start,end)) > > df *=* trainDF*.*iloc[ start:end ] > > > > There does not seem to be an easy way to do this. > > > https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.monotonically_increasing_id.html > > The generated ID is guaranteed to be monotonically increasing and unique, > but not consecutive. > > > > > > Comments and suggestions appreciated > > > > Andy > > > > > > *From: *Gourav Sengupta > *Date: *Monday, January 10, 2022 at 11:03 AM > *To: *Andrew Davidson > *Cc: *"user @spark" > *Subject: *Re: How to add a row number column with out reordering my data > frame > > > > Hi, > > > > I am a bit confused here, it is not entirely clear to me why are you > creating the row numbers, and how creating the row numbers helps you with > the joins? > > > > Can you please explain with some sample data? > > > > > > Regards, > > Gourav > > > > On Fri, Jan 7, 2022 at 1:14 AM Andrew Davidson > wrote: > > Hi > > > > I am trying to work through a OOM error. I have 10411 files. I want to > select a single column from each file and then join them into a single > table. > > > > The files have a row unique id. However it is a very long string. The data > file with just the name and column of interest is about 470 M. The column > of interest alone is 21 m. it is a column over 5 million real numbers. >
Re: How to add a row number column with out reordering my data frame
HI Gourav When I join I get OOM. To address this my thought was to split my tables into small batches of rows. And then join the batch together then use union. My assumption is the union is a narrow transform and as such require fewer resources. Let say I have 5 data frames I want to join together and each has 300 rows I want to create 15 data frames. Set1 = {11, 12, 13, 14, 15} Set2 = {21, 22, 23, 24, 25} Set3 = {31, 32, 33, 34, 35) The joined the “batch S1joinDF = 11.join(12).join(13).join(14).join(15) S2joinDF = 21.join(22).join(23).join(24).join(25) S3joinDF = 31.join(32).join(33).join(34).join(35) resultDF = S1joinDF.union( S2joinDF ) .union( S3joinDF ) The I originally wrote my code is as follows. Based on my unit test it turns out I need to call orderBy on every iteration of the for loop. I tried sorting outside of the while loop, did not resolve problem Given the size of my dataframes that is going crush performance. My unit test works. I never ran it on my real data set. # Create a copy of original dataframe copyDF = df.orderBy("Name") # copyDF.show() i = 0 while i < numberOfSplits: self.logger.warn("i:{}".format(i)) # Get the top `numRows` number of rows # note take() is an action # limit() is a transformation topDF = copyDF.limit( numRows ) # Truncate the `copy_df` to remove # the contents fetched for `temp_df` # original quant.sf files are sorted by name however # we must use order by, else the row names between # GTEx sample will not be the same # we can not simply sort or orderBy once. we have to # do this on every iteration copyDF = copyDF.subtract(topDF).orderBy( "Name" ) retList[i] = topDF # Increment the split number i += 1 if remainingRows > 0 : self.logger.info("AEDWIP writing last i:{} len(retList):{}".format(i, len(retList))) retList[i] = copyDF #copyDF.show() #retList[i].show() okay so that the background. Rather than use order by. I thought if I could add a row number I could easily split up mydata frames. My code would look a lot like what I would write in pandas or R while i < numBatches: start = i * numRows end = start + numRows print("\ni:{} start:{} end:{}".format(i, start,end)) df = trainDF.iloc[ start:end ] There does not seem to be an easy way to do this. https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.monotonically_increasing_id.html The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. Comments and suggestions appreciated Andy From: Gourav Sengupta Date: Monday, January 10, 2022 at 11:03 AM To: Andrew Davidson Cc: "user @spark" Subject: Re: How to add a row number column with out reordering my data frame Hi, I am a bit confused here, it is not entirely clear to me why are you creating the row numbers, and how creating the row numbers helps you with the joins? Can you please explain with some sample data? Regards, Gourav On Fri, Jan 7, 2022 at 1:14 AM Andrew Davidson wrote: Hi I am trying to work through a OOM error. I have 10411 files. I want to select a single column from each file and then join them into a single table. The files have a row unique id. However it is a very long string. The data file with just the name and column of interest is about 470 M. The column of interest alone is 21 m. it is a column over 5 million real numbers. So I thought I would save a lot of memory if I can join over row numbers. # create dummy variable to orderby https://www.py4u.net/discuss/1840945 w = Window().orderBy(lit('A')) sampleDF = sampleDF.select( ["NumReads"] )\ .withColumnRenamed( "NumReads", sampleName )\ .withColumn( "tid",row_number().over(w) ) This code seem pretty complicated as someone coming from pandas an R dataframes. My unit test works however it generates the following warning. WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. Is there a better way to create a row number with our reordering my data? The order is important Kind regards Andy
Re: How to add a row number column with out reordering my data frame
Hi, I am a bit confused here, it is not entirely clear to me why are you creating the row numbers, and how creating the row numbers helps you with the joins? Can you please explain with some sample data? Regards, Gourav On Fri, Jan 7, 2022 at 1:14 AM Andrew Davidson wrote: > Hi > > > > I am trying to work through a OOM error. I have 10411 files. I want to > select a single column from each file and then join them into a single > table. > > > > The files have a row unique id. However it is a very long string. The data > file with just the name and column of interest is about 470 M. The column > of interest alone is 21 m. it is a column over 5 million real numbers. > > > > So I thought I would save a lot of memory if I can join over row numbers. > > > > # create *dummy* variable to *orderby* > https://www.py4u.net/discuss/1840945 > > w = Window().orderBy(lit('A')) > > sampleDF = sampleDF.select( ["NumReads"] )\ > > .withColumnRenamed( "NumReads", sampleName )\ > > .withColumn( "*tid*",row_number().over(w) ) > > > > > > This code seem pretty complicated as someone coming from pandas an R > dataframes. My unit test works however it generates the following warning. > > > > > > WARN WindowExec: No Partition Defined for Window operation! Moving all > data to a single partition, this can cause serious performance degradation. > > > > > > Is there a better way to create a row number with our reordering my data? > The order is important > > > > Kind regards > > > > Andy >
How to add a row number column with out reordering my data frame
Hi I am trying to work through a OOM error. I have 10411 files. I want to select a single column from each file and then join them into a single table. The files have a row unique id. However it is a very long string. The data file with just the name and column of interest is about 470 M. The column of interest alone is 21 m. it is a column over 5 million real numbers. So I thought I would save a lot of memory if I can join over row numbers. # create dummy variable to orderby https://www.py4u.net/discuss/1840945 w = Window().orderBy(lit('A')) sampleDF = sampleDF.select( ["NumReads"] )\ .withColumnRenamed( "NumReads", sampleName )\ .withColumn( "tid",row_number().over(w) ) This code seem pretty complicated as someone coming from pandas an R dataframes. My unit test works however it generates the following warning. WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. Is there a better way to create a row number with our reordering my data? The order is important Kind regards Andy