Re: How to add a row number column with out reordering my data frame

2022-01-11 Thread Andrew Davidson
Thanks!

I will take a look

Andy

From: Gourav Sengupta 
Date: Tuesday, January 11, 2022 at 8:42 AM
To: Andrew Davidson 
Cc: Andrew Davidson , "user @spark" 

Subject: Re: How to add a row number column with out reordering my data frame

Hi,
I do not think we need to do any of that. Please try repartitionbyrange, dpark 
3 has adaptive query execution with configurations to handle skew as well.

Regards,
Gourav

On Tue, Jan 11, 2022 at 4:21 PM Andrew Davidson 
mailto:aedav...@ucsc.edu>> wrote:
HI Gourav

When I join I get OOM. To address this my thought was to split my tables into 
small batches of rows. And then join the batch together then use union. My 
assumption is the union is a narrow transform and as such require fewer 
resources. Let say I have 5 data frames I want to join together and each has 
300 rows

I want to create 15 data frames.

Set1 = {11, 12, 13, 14, 15}

Set2 = {21, 22, 23, 24, 25}

Set3 = {31, 32, 33, 34, 35)

The joined the “batch
S1joinDF = 11.join(12).join(13).join(14).join(15)

S2joinDF = 21.join(22).join(23).join(24).join(25)

S3joinDF = 31.join(32).join(33).join(34).join(35)

resultDF = S1joinDF.union( S2joinDF ) .union( S3joinDF )

The I originally wrote my code is as follows. Based on my unit test it turns 
out I need to call orderBy on every iteration of the for loop. I tried sorting 
outside of the while loop, did not resolve problem Given the size of my 
dataframes that is going crush performance. My unit test works. I never ran it 
on my real data set.

# Create a copy of original dataframe
copyDF = df.orderBy("Name")
# copyDF.show()

i = 0
while i < numberOfSplits:
self.logger.warn("i:{}".format(i))
# Get the top `numRows` number of rows
# note take() is an action
# limit() is a transformation
topDF = copyDF.limit( numRows )

# Truncate the `copy_df` to remove
# the contents fetched for `temp_df`
# original quant.sf files are sorted by name however
# we must use order by, else the row names between
# GTEx sample will not be the same
# we can not simply sort or orderBy once. we have to
# do this on every iteration
copyDF = copyDF.subtract(topDF).orderBy( "Name" )

retList[i] = topDF

# Increment the split number
i += 1

if remainingRows > 0 :
self.logger.info<http://self.logger.info>("AEDWIP writing last i:{} 
len(retList):{}".format(i, len(retList)))
retList[i] = copyDF
#copyDF.show()
#retList[i].show()


okay so that the background. Rather than use order by. I thought if I could add 
a row number I could easily split up mydata frames. My code would look a lot 
like what I would write in pandas or R

while i < numBatches:
start = i * numRows
end = start + numRows
print("\ni:{} start:{} end:{}".format(i, start,end))
df = trainDF.iloc[ start:end ]

There does not seem to be an easy way to do this.
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.monotonically_increasing_id.html
The generated ID is guaranteed to be monotonically increasing and unique, but 
not consecutive.


Comments and suggestions appreciated

Andy


From: Gourav Sengupta 
mailto:gourav.sengu...@gmail.com>>
Date: Monday, January 10, 2022 at 11:03 AM
To: Andrew Davidson 
Cc: "user @spark" mailto:user@spark.apache.org>>
Subject: Re: How to add a row number column with out reordering my data frame

Hi,

I am a bit confused here, it is not entirely clear to me why are you creating 
the row numbers, and how creating the row numbers helps you with the joins?

Can you please explain with some sample data?


Regards,
Gourav

On Fri, Jan 7, 2022 at 1:14 AM Andrew Davidson  
wrote:
Hi

I am trying to work through a OOM error. I have 10411 files. I want to select a 
single column from each file and then join them into a single table.

The files have a row unique id. However it is a very long string. The data file 
with just the name and column of interest is about 470 M. The column of 
interest alone is 21 m. it is a column over 5 million real numbers.

So I thought I would save a lot of memory if I can join over row numbers.

# create dummy variable to orderby https://www.py4u.net/discuss/1840945
w = Window().orderBy(lit('A'))
sampleDF = sampleDF.select( ["NumReads"] )\
.withColumnRenamed( "NumReads", sampleName )\
.withColumn( "tid",row_number().over(w) )


This code seem pretty complicated as someone coming from pandas an R 
dataframes. My unit test works however it generates the following warning.



WARN WindowExec: No Partition Defined for Window operation! Moving all d

Re: How to add a row number column with out reordering my data frame

2022-01-11 Thread Gourav Sengupta
Hi,
I do not think we need to do any of that. Please try repartitionbyrange,
dpark 3 has adaptive query execution with configurations to handle skew as
well.

Regards,
Gourav

On Tue, Jan 11, 2022 at 4:21 PM Andrew Davidson  wrote:

> HI Gourav
>
>
>
> When I join I get OOM. To address this my thought was to split my tables
> into small batches of rows. And then join the batch together then use
> union. My assumption is the union is a narrow transform and as such require
> fewer resources. Let say I have 5 data frames I want to join together and
> each has 300 rows
>
>
>
> I want to create 15 data frames.
>
>
>
> Set1 = {11, 12, 13, 14, 15}
>
>
>
> Set2 = {21, 22, 23, 24, 25}
>
>
>
> Set3 = {31, 32, 33, 34, 35)
>
>
>
> The joined the “batch
>
> S1joinDF = 11.join(12).join(13).join(14).join(15)
>
>
>
> S2joinDF = 21.join(22).join(23).join(24).join(25)
>
>
>
> S3joinDF = 31.join(32).join(33).join(34).join(35)
>
>
>
> resultDF = S1joinDF.union( S2joinDF ) .union( S3joinDF )
>
>
>
> The I originally wrote my code is as follows. Based on my unit test it
> turns out I need to call orderBy on every iteration of the for loop. I
> tried sorting outside of the while loop, did not resolve problem Given the
> size of my dataframes that is going crush performance. My unit test works.
> I never ran it on my real data set.
>
>
>
> # Create a copy of original *dataframe*
>
> copyDF = df.orderBy("Name")
>
> # copyDF.show()
>
>
>
> i = 0
>
> while i < numberOfSplits:
>
> self.logger.warn("i:{}".format(i))
>
> # Get the top `numRows` number of rows
>
> # note take() is an action
>
> # limit() is a transformation
>
> topDF = copyDF.limit( numRows )
>
>
>
> # Truncate the `copy_df` to remove
>
> # the contents fetched for `temp_df`
>
> # original quant.sf files are sorted by name however
>
> # we must use order by, else the row names between
>
> # GTEx sample will not be the same
>
> # we can not simply sort or orderBy once. we have to
>
> # do this on every iteration
>
> copyDF = copyDF.subtract(topDF).orderBy( "Name" )
>
>
>
> retList[i] = topDF
>
>
>
> # Increment the split number
>
> i += 1
>
>
>
> if remainingRows > 0 :
>
> self.logger.info("AEDWIP writing last i:{} 
> *len*(retList):{}".format(i,
> len(retList)))
>
> retList[i] = copyDF
>
> #copyDF.show()
>
> #retList[i].show()
>
>
>
>
>
> okay so that the background. Rather than use order by. I thought if I
> could add a row number I could easily split up mydata frames. My code would
> look a lot like what I would write in pandas or R
>
>
>
> *while* i *<* numBatches:
>
> start *=* i *** numRows
>
> end *=* start *+* numRows
>
> print("\ni:{} start:{} end:{}"*.*format(i, start,end))
>
> df *=* trainDF*.*iloc[ start:end ]
>
>
>
> There does not seem to be an easy way to do this.
>
>
> https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.monotonically_increasing_id.html
>
> The generated ID is guaranteed to be monotonically increasing and unique,
> but not consecutive.
>
>
>
>
>
> Comments and suggestions appreciated
>
>
>
> Andy
>
>
>
>
>
> *From: *Gourav Sengupta 
> *Date: *Monday, January 10, 2022 at 11:03 AM
> *To: *Andrew Davidson 
> *Cc: *"user @spark" 
> *Subject: *Re: How to add a row number column with out reordering my data
> frame
>
>
>
> Hi,
>
>
>
> I am a bit confused here, it is not entirely clear to me why are you
> creating the row numbers, and how creating the row numbers helps you with
> the joins?
>
>
>
> Can you please explain with some sample data?
>
>
>
>
>
> Regards,
>
> Gourav
>
>
>
> On Fri, Jan 7, 2022 at 1:14 AM Andrew Davidson 
> wrote:
>
> Hi
>
>
>
> I am trying to work through a OOM error. I have 10411 files. I want to
> select a single column from each file and then join them into a single
> table.
>
>
>
> The files have a row unique id. However it is a very long string. The data
> file with just the name and column of interest is about 470 M. The column
> of interest alone is 21 m. it is a column over 5 million real numbers.
>

Re: How to add a row number column with out reordering my data frame

2022-01-11 Thread Andrew Davidson
HI Gourav

When I join I get OOM. To address this my thought was to split my tables into 
small batches of rows. And then join the batch together then use union. My 
assumption is the union is a narrow transform and as such require fewer 
resources. Let say I have 5 data frames I want to join together and each has 
300 rows

I want to create 15 data frames.

Set1 = {11, 12, 13, 14, 15}

Set2 = {21, 22, 23, 24, 25}

Set3 = {31, 32, 33, 34, 35)

The joined the “batch
S1joinDF = 11.join(12).join(13).join(14).join(15)

S2joinDF = 21.join(22).join(23).join(24).join(25)

S3joinDF = 31.join(32).join(33).join(34).join(35)

resultDF = S1joinDF.union( S2joinDF ) .union( S3joinDF )

The I originally wrote my code is as follows. Based on my unit test it turns 
out I need to call orderBy on every iteration of the for loop. I tried sorting 
outside of the while loop, did not resolve problem Given the size of my 
dataframes that is going crush performance. My unit test works. I never ran it 
on my real data set.

# Create a copy of original dataframe
copyDF = df.orderBy("Name")
# copyDF.show()

i = 0
while i < numberOfSplits:
self.logger.warn("i:{}".format(i))
# Get the top `numRows` number of rows
# note take() is an action
# limit() is a transformation
topDF = copyDF.limit( numRows )

# Truncate the `copy_df` to remove
# the contents fetched for `temp_df`
# original quant.sf files are sorted by name however
# we must use order by, else the row names between
# GTEx sample will not be the same
# we can not simply sort or orderBy once. we have to
# do this on every iteration
copyDF = copyDF.subtract(topDF).orderBy( "Name" )

retList[i] = topDF

# Increment the split number
i += 1

if remainingRows > 0 :
self.logger.info("AEDWIP writing last i:{} 
len(retList):{}".format(i, len(retList)))
retList[i] = copyDF
#copyDF.show()
#retList[i].show()


okay so that the background. Rather than use order by. I thought if I could add 
a row number I could easily split up mydata frames. My code would look a lot 
like what I would write in pandas or R

while i < numBatches:
start = i * numRows
end = start + numRows
print("\ni:{} start:{} end:{}".format(i, start,end))
df = trainDF.iloc[ start:end ]

There does not seem to be an easy way to do this.
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.monotonically_increasing_id.html
The generated ID is guaranteed to be monotonically increasing and unique, but 
not consecutive.


Comments and suggestions appreciated

Andy


From: Gourav Sengupta 
Date: Monday, January 10, 2022 at 11:03 AM
To: Andrew Davidson 
Cc: "user @spark" 
Subject: Re: How to add a row number column with out reordering my data frame

Hi,

I am a bit confused here, it is not entirely clear to me why are you creating 
the row numbers, and how creating the row numbers helps you with the joins?

Can you please explain with some sample data?


Regards,
Gourav

On Fri, Jan 7, 2022 at 1:14 AM Andrew Davidson  
wrote:
Hi

I am trying to work through a OOM error. I have 10411 files. I want to select a 
single column from each file and then join them into a single table.

The files have a row unique id. However it is a very long string. The data file 
with just the name and column of interest is about 470 M. The column of 
interest alone is 21 m. it is a column over 5 million real numbers.

So I thought I would save a lot of memory if I can join over row numbers.

# create dummy variable to orderby https://www.py4u.net/discuss/1840945
w = Window().orderBy(lit('A'))
sampleDF = sampleDF.select( ["NumReads"] )\
.withColumnRenamed( "NumReads", sampleName )\
.withColumn( "tid",row_number().over(w) )


This code seem pretty complicated as someone coming from pandas an R 
dataframes. My unit test works however it generates the following warning.



WARN WindowExec: No Partition Defined for Window operation! Moving all data to 
a single partition, this can cause serious performance degradation.


Is there a better  way to create a row number with our reordering my data? The 
order is important

Kind regards

Andy


Re: How to add a row number column with out reordering my data frame

2022-01-10 Thread Gourav Sengupta
Hi,

I am a bit confused here, it is not entirely clear to me why are you
creating the row numbers, and how creating the row numbers helps you with
the joins?

Can you please explain with some sample data?


Regards,
Gourav

On Fri, Jan 7, 2022 at 1:14 AM Andrew Davidson 
wrote:

> Hi
>
>
>
> I am trying to work through a OOM error. I have 10411 files. I want to
> select a single column from each file and then join them into a single
> table.
>
>
>
> The files have a row unique id. However it is a very long string. The data
> file with just the name and column of interest is about 470 M. The column
> of interest alone is 21 m. it is a column over 5 million real numbers.
>
>
>
> So I thought I would save a lot of memory if I can join over row numbers.
>
>
>
> # create *dummy* variable to *orderby*
> https://www.py4u.net/discuss/1840945
>
> w = Window().orderBy(lit('A'))
>
> sampleDF = sampleDF.select( ["NumReads"] )\
>
> .withColumnRenamed( "NumReads", sampleName )\
>
> .withColumn( "*tid*",row_number().over(w) )
>
>
>
>
>
> This code seem pretty complicated as someone coming from pandas an R
> dataframes. My unit test works however it generates the following warning.
>
>
>
>
>
> WARN WindowExec: No Partition Defined for Window operation! Moving all
> data to a single partition, this can cause serious performance degradation.
>
>
>
>
>
> Is there a better  way to create a row number with our reordering my data?
> The order is important
>
>
>
> Kind regards
>
>
>
> Andy
>


How to add a row number column with out reordering my data frame

2022-01-06 Thread Andrew Davidson
Hi

I am trying to work through a OOM error. I have 10411 files. I want to select a 
single column from each file and then join them into a single table.

The files have a row unique id. However it is a very long string. The data file 
with just the name and column of interest is about 470 M. The column of 
interest alone is 21 m. it is a column over 5 million real numbers.

So I thought I would save a lot of memory if I can join over row numbers.

# create dummy variable to orderby https://www.py4u.net/discuss/1840945
w = Window().orderBy(lit('A'))
sampleDF = sampleDF.select( ["NumReads"] )\
.withColumnRenamed( "NumReads", sampleName )\
.withColumn( "tid",row_number().over(w) )


This code seem pretty complicated as someone coming from pandas an R 
dataframes. My unit test works however it generates the following warning.



WARN WindowExec: No Partition Defined for Window operation! Moving all data to 
a single partition, this can cause serious performance degradation.


Is there a better  way to create a row number with our reordering my data? The 
order is important

Kind regards

Andy