Re: Joining many tables Re: Pyspark debugging best practices

2022-01-03 Thread Sonal Goyal
Hi Andrew,

Do you think the following would work?

Build data frames by appending a column source to each (sampleName). Add
extra columns as per scheme of quantSchema. Then union. So you get one data
frame with many entries per name. You can then use windowing functions over
them.

On Tue, 4 Jan 2022 at 6:29 AM, Andrew Davidson 
wrote:

> Hi David
>
>
>
> I need to select 1 column from many files and combine them into a single
> table.
>
>
>
> I do not believe union() will work. It appends rows, not columns.
>
>
>
> As far as I know join() is the only way to append columns from different
> data frames.
>
>
>
> I think you correct that using lazy evaluation over a lot of joins may
> make the execution plan to complicated. To debug I added
>
>
>
> logger.warn( “i:{}, num file rows:{} num joined rows:{}”.format(i,
> df.count(), retDF.count()  )
>
>
>
> to try and simplify the execution plan.
>
>
>
> Once I set spark.sql.autoBroadcastJoinThreshold=-1 my big job started
> making some progress how ever fails after a few files. Resources are maxed
> out!
>
>
>
> I estimated that that the raw data should be < 500 GB. I am running a
> cluster with 2.8 TB that should be more than enough to spark over head
>
>
>
> Is spark integrated with the python garbage collector?
>
>
>
> I assume createOrReplaceTempView() would cause cache to get flushed as
> needed?
>
>
>
> Kind regards
>
>
>
> Andy
>
>
>
> 
> ###
>
>
> def _loadSalmonReadsTable(self):
>
> '''
>
> AEDWIP TODO
>
> '''
>
> self.logger.info( "BEGIN" )
>
> retNumReadsDF = None
>
> quantSchema = "`Name` STRING, `Length` INT, `EffectiveLength`
> DOUBLE, `TPM` DOUBLE, `NumReads` DOUBLE "
>
> for i in range( len(self.fileList) ):
>
> #
>
> # get NumReads from next salmon *quant* file
>
> #
>
> quantFile = self.fileList[i]
>
> sampleDF = self.spark.read.load( quantFile, format="*csv*",
> sep="\t",
>
>  schema=quantSchema, header="true" )
>
> # did not fix bug .repartition(50)
>
>
>
> sampleName = self.sampleNamesList[i]
>
> sampleDF = sampleDF.select( ["Name", "NumReads"] )\
>
> .withColumnRenamed( "NumReads", sampleName )
>
>
>
> sampleDF.createOrReplaceTempView( "sample" )
>
>
>
> self.logger.warn("AEDWIP i:{} sampleName:{} sampleDF.num
> rows:{} *num* *cols*:{} *num* parts:{}"
>
>  .format(i, sampleName, sampleDF.count(),
> len(sampleDF.columns), sampleDF.rdd.getNumPartitions()))
>
>
>
> #
>
> # append NumReads to table of reads
>
> #
>
>
>
> # the sample name must be quoted else column names with a '-'
>
> # like 1117F-0426-SM-5EGHI will generate an error
>
> # spark think the '-' is an expression. '_' is also
>
> # a special char for the *sql* like operator
>
> # https://stackoverflow.com/a/63899306/4586180
>
> sqlStmt = '\t\t\t\t\t\*tselect* *rc*.*, `{}` \n\
>
> from \n\
>
>retNumReadsDF as *rc*, \n\
>
>sample  \n\
>
> where \n\
>
> rc.Name == sample.Name \n'.format(
> sampleName )
>
>
>
> self.logger.debug( "sqlStmt:\n{}\n".format( sqlStmt ) )
>
> if i == 0 :
>
> retNumReadsDF = sampleDF
>
> else :
>
> retNumReadsDF = self.spark.sql( sqlStmt )
>
>
>
> retNumReadsDF.createOrReplaceTempView( "retNumReadsDF" )
>
>
>
> #
>
> # debug. seems like we do not make progress when we run on
> training
>
> # nothing happens, logs do not change, cluster metrics drop
> suggesting no work
>
> # is being done
>
> # add an action to try and debug
>
> # this should not change the physical plan. I.e. we still
> have the same number of shuffles
>
> # which results in the same number of stage. We are just not
> building up a plan with thousands
>
> # of stages.
>
> #
>
> self.logger.warn("AEDWIP i:{} retNumReadsDF.num rows:{} *num*
> *cols*:{} *num* parts:{}"
>
>  .format(i, retNumReadsDF.count(),
> len(retNumReadsDF.columns), retNumReadsDF.rdd.getNumPartitions()) )
>
>
>
> #
>
> # TODO AEDWIP spark analyze chapter 18 debugging joins
>
>
>
> # execution plan should be the same for each join
>
> #rawCountsSDF.explain()
>
>
>
> self.logger.info( "END\n" )
>
> return retNumReadsDF
>
>
>
>
>
> *From: *David Diebold 
> *Date: *Monday, January 3, 2022 at 12:39 AM
> *To: *Andrew D

Joining many tables Re: Pyspark debugging best practices

2022-01-03 Thread Andrew Davidson
Hi David

I need to select 1 column from many files and combine them into a single table.

I do not believe union() will work. It appends rows, not columns.

As far as I know join() is the only way to append columns from different data 
frames.

I think you correct that using lazy evaluation over a lot of joins may make the 
execution plan to complicated. To debug I added

logger.warn( “i:{}, num file rows:{} num joined rows:{}”.format(i, df.count(), 
retDF.count()  )

to try and simplify the execution plan.


Once I set spark.sql.autoBroadcastJoinThreshold=-1 my big job started making 
some progress how ever fails after a few files. Resources are maxed out!



I estimated that that the raw data should be < 500 GB. I am running a cluster 
with 2.8 TB that should be more than enough to spark over head



Is spark integrated with the python garbage collector?



I assume createOrReplaceTempView() would cause cache to get flushed as needed?



Kind regards



Andy



###
def _loadSalmonReadsTable(self):
'''
AEDWIP TODO
'''
self.logger.info( "BEGIN" )
retNumReadsDF = None
quantSchema = "`Name` STRING, `Length` INT, `EffectiveLength` DOUBLE, 
`TPM` DOUBLE, `NumReads` DOUBLE "
for i in range( len(self.fileList) ):
#
# get NumReads from next salmon quant file
#
quantFile = self.fileList[i]
sampleDF = self.spark.read.load( quantFile, format="csv", sep="\t",
 schema=quantSchema, header="true" )
# did not fix bug .repartition(50)

sampleName = self.sampleNamesList[i]
sampleDF = sampleDF.select( ["Name", "NumReads"] )\
.withColumnRenamed( "NumReads", sampleName )

sampleDF.createOrReplaceTempView( "sample" )

self.logger.warn("AEDWIP i:{} sampleName:{} sampleDF.num rows:{} 
num cols:{} num parts:{}"
 .format(i, sampleName, sampleDF.count(), 
len(sampleDF.columns), sampleDF.rdd.getNumPartitions()))

#
# append NumReads to table of reads
#

# the sample name must be quoted else column names with a '-'
# like 1117F-0426-SM-5EGHI will generate an error
# spark think the '-' is an expression. '_' is also
# a special char for the sql like operator
# https://stackoverflow.com/a/63899306/4586180
sqlStmt = '\t\t\t\t\t\tselect rc.*, `{}` \n\
from \n\
   retNumReadsDF as rc, \n\
   sample  \n\
where \n\
rc.Name == sample.Name \n'.format( sampleName )

self.logger.debug( "sqlStmt:\n{}\n".format( sqlStmt ) )
if i == 0 :
retNumReadsDF = sampleDF
else :
retNumReadsDF = self.spark.sql( sqlStmt )

retNumReadsDF.createOrReplaceTempView( "retNumReadsDF" )

#
# debug. seems like we do not make progress when we run on training
# nothing happens, logs do not change, cluster metrics drop 
suggesting no work
# is being done
# add an action to try and debug
# this should not change the physical plan. I.e. we still have the 
same number of shuffles
# which results in the same number of stage. We are just not 
building up a plan with thousands
# of stages.
#
self.logger.warn("AEDWIP i:{} retNumReadsDF.num rows:{} num cols:{} 
num parts:{}"
 .format(i, retNumReadsDF.count(), 
len(retNumReadsDF.columns), retNumReadsDF.rdd.getNumPartitions()) )

#
# TODO AEDWIP spark analyze chapter 18 debugging joins

# execution plan should be the same for each join
#rawCountsSDF.explain()

self.logger.info( "END\n" )
return retNumReadsDF


From: David Diebold 
Date: Monday, January 3, 2022 at 12:39 AM
To: Andrew Davidson , "user @spark" 

Subject: Re: Pyspark debugging best practices

Hello Andy,

Are you sure you want to perform lots of join operations, and not simple unions 
?
Are you doing inner joins or outer joins ?
Can you provide us with a rough amount of your list size plus each individual 
dataset size ?
Have a look at execution plan would help, maybe the high amount of join 
operations makes execution plan too complicated at the end of the day ; 
checkpointing could help there ?

Cheers,
David


Le jeu. 30 déc. 2021 à 16:56, Andrew Davidson  a 
écrit :
Hi Gourav

I will give databricks a try.

Each data gets loaded into a data frame.
I select one column from the data frame
I join the column to the  accumulated joins from previous data frames in the