Re: Joining many tables Re: Pyspark debugging best practices

2022-01-03 Thread Sonal Goyal
Hi Andrew,

Do you think the following would work?

Build data frames by appending a column source to each (sampleName). Add
extra columns as per scheme of quantSchema. Then union. So you get one data
frame with many entries per name. You can then use windowing functions over
them.

On Tue, 4 Jan 2022 at 6:29 AM, Andrew Davidson 
wrote:

> Hi David
>
>
>
> I need to select 1 column from many files and combine them into a single
> table.
>
>
>
> I do not believe union() will work. It appends rows, not columns.
>
>
>
> As far as I know join() is the only way to append columns from different
> data frames.
>
>
>
> I think you correct that using lazy evaluation over a lot of joins may
> make the execution plan to complicated. To debug I added
>
>
>
> logger.warn( “i:{}, num file rows:{} num joined rows:{}”.format(i,
> df.count(), retDF.count()  )
>
>
>
> to try and simplify the execution plan.
>
>
>
> Once I set spark.sql.autoBroadcastJoinThreshold=-1 my big job started
> making some progress how ever fails after a few files. Resources are maxed
> out!
>
>
>
> I estimated that that the raw data should be < 500 GB. I am running a
> cluster with 2.8 TB that should be more than enough to spark over head
>
>
>
> Is spark integrated with the python garbage collector?
>
>
>
> I assume createOrReplaceTempView() would cause cache to get flushed as
> needed?
>
>
>
> Kind regards
>
>
>
> Andy
>
>
>
> 
> ###
>
>
> def _loadSalmonReadsTable(self):
>
> '''
>
> AEDWIP TODO
>
> '''
>
> self.logger.info( "BEGIN" )
>
> retNumReadsDF = None
>
> quantSchema = "`Name` STRING, `Length` INT, `EffectiveLength`
> DOUBLE, `TPM` DOUBLE, `NumReads` DOUBLE "
>
> for i in range( len(self.fileList) ):
>
> #
>
> # get NumReads from next salmon *quant* file
>
> #
>
> quantFile = self.fileList[i]
>
> sampleDF = self.spark.read.load( quantFile, format="*csv*",
> sep="\t",
>
>  schema=quantSchema, header="true" )
>
> # did not fix bug .repartition(50)
>
>
>
> sampleName = self.sampleNamesList[i]
>
> sampleDF = sampleDF.select( ["Name", "NumReads"] )\
>
> .withColumnRenamed( "NumReads", sampleName )
>
>
>
> sampleDF.createOrReplaceTempView( "sample" )
>
>
>
> self.logger.warn("AEDWIP i:{} sampleName:{} sampleDF.num
> rows:{} *num* *cols*:{} *num* parts:{}"
>
>  .format(i, sampleName, sampleDF.count(),
> len(sampleDF.columns), sampleDF.rdd.getNumPartitions()))
>
>
>
> #
>
> # append NumReads to table of reads
>
> #
>
>
>
> # the sample name must be quoted else column names with a '-'
>
> # like 1117F-0426-SM-5EGHI will generate an error
>
> # spark think the '-' is an expression. '_' is also
>
> # a special char for the *sql* like operator
>
> # https://stackoverflow.com/a/63899306/4586180
>
> sqlStmt = '\t\t\t\t\t\*tselect* *rc*.*, `{}` \n\
>
> from \n\
>
>retNumReadsDF as *rc*, \n\
>
>sample  \n\
>
> where \n\
>
> rc.Name == sample.Name \n'.format(
> sampleName )
>
>
>
> self.logger.debug( "sqlStmt:\n{}\n".format( sqlStmt ) )
>
> if i == 0 :
>
> retNumReadsDF = sampleDF
>
> else :
>
> retNumReadsDF = self.spark.sql( sqlStmt )
>
>
>
> retNumReadsDF.createOrReplaceTempView( "retNumReadsDF" )
>
>
>
> #
>
> # debug. seems like we do not make progress when we run on
> training
>
> # nothing happens, logs do not change, cluster metrics drop
> suggesting no work
>
> # is being done
>
> # add an action to try and debug
>
> # this should not change the physical plan. I.e. we still
> have the same number of shuffles
>
> # which results in the same number of stage. We are just not
> building up a plan with thousands
>
> # of stages.
>
> #
>
> self.logger.warn("AEDWIP i:{} retNumReadsDF.num rows:{} *num*
> *cols*:{} *num* parts:{}"
>
>  .format(i, retNumReadsDF.count(),
> len(retNumReadsDF.columns), retNumReadsDF.rdd.getNumPartitions()) )
>
>
>
> #
>
> # TODO AEDWIP spark analyze chapter 18 debugging joins
>
>
>
> # execution plan should be the same for each join
>
> #rawCountsSDF.explain()
>
>
>
> self.logger.info( "END\n" )
>
> return retNumReadsDF
>
>
>
>
>
> *From: *David Diebold 
> *Date: *Monday, January 3, 2022 at 12:39 AM
> *To: *Andrew 

Joining many tables Re: Pyspark debugging best practices

2022-01-03 Thread Andrew Davidson
Hi David

I need to select 1 column from many files and combine them into a single table.

I do not believe union() will work. It appends rows, not columns.

As far as I know join() is the only way to append columns from different data 
frames.

I think you correct that using lazy evaluation over a lot of joins may make the 
execution plan to complicated. To debug I added

logger.warn( “i:{}, num file rows:{} num joined rows:{}”.format(i, df.count(), 
retDF.count()  )

to try and simplify the execution plan.


Once I set spark.sql.autoBroadcastJoinThreshold=-1 my big job started making 
some progress how ever fails after a few files. Resources are maxed out!



I estimated that that the raw data should be < 500 GB. I am running a cluster 
with 2.8 TB that should be more than enough to spark over head



Is spark integrated with the python garbage collector?



I assume createOrReplaceTempView() would cause cache to get flushed as needed?



Kind regards



Andy



###
def _loadSalmonReadsTable(self):
'''
AEDWIP TODO
'''
self.logger.info( "BEGIN" )
retNumReadsDF = None
quantSchema = "`Name` STRING, `Length` INT, `EffectiveLength` DOUBLE, 
`TPM` DOUBLE, `NumReads` DOUBLE "
for i in range( len(self.fileList) ):
#
# get NumReads from next salmon quant file
#
quantFile = self.fileList[i]
sampleDF = self.spark.read.load( quantFile, format="csv", sep="\t",
 schema=quantSchema, header="true" )
# did not fix bug .repartition(50)

sampleName = self.sampleNamesList[i]
sampleDF = sampleDF.select( ["Name", "NumReads"] )\
.withColumnRenamed( "NumReads", sampleName )

sampleDF.createOrReplaceTempView( "sample" )

self.logger.warn("AEDWIP i:{} sampleName:{} sampleDF.num rows:{} 
num cols:{} num parts:{}"
 .format(i, sampleName, sampleDF.count(), 
len(sampleDF.columns), sampleDF.rdd.getNumPartitions()))

#
# append NumReads to table of reads
#

# the sample name must be quoted else column names with a '-'
# like 1117F-0426-SM-5EGHI will generate an error
# spark think the '-' is an expression. '_' is also
# a special char for the sql like operator
# https://stackoverflow.com/a/63899306/4586180
sqlStmt = '\t\t\t\t\t\tselect rc.*, `{}` \n\
from \n\
   retNumReadsDF as rc, \n\
   sample  \n\
where \n\
rc.Name == sample.Name \n'.format( sampleName )

self.logger.debug( "sqlStmt:\n{}\n".format( sqlStmt ) )
if i == 0 :
retNumReadsDF = sampleDF
else :
retNumReadsDF = self.spark.sql( sqlStmt )

retNumReadsDF.createOrReplaceTempView( "retNumReadsDF" )

#
# debug. seems like we do not make progress when we run on training
# nothing happens, logs do not change, cluster metrics drop 
suggesting no work
# is being done
# add an action to try and debug
# this should not change the physical plan. I.e. we still have the 
same number of shuffles
# which results in the same number of stage. We are just not 
building up a plan with thousands
# of stages.
#
self.logger.warn("AEDWIP i:{} retNumReadsDF.num rows:{} num cols:{} 
num parts:{}"
 .format(i, retNumReadsDF.count(), 
len(retNumReadsDF.columns), retNumReadsDF.rdd.getNumPartitions()) )

#
# TODO AEDWIP spark analyze chapter 18 debugging joins

# execution plan should be the same for each join
#rawCountsSDF.explain()

self.logger.info( "END\n" )
return retNumReadsDF


From: David Diebold 
Date: Monday, January 3, 2022 at 12:39 AM
To: Andrew Davidson , "user @spark" 

Subject: Re: Pyspark debugging best practices

Hello Andy,

Are you sure you want to perform lots of join operations, and not simple unions 
?
Are you doing inner joins or outer joins ?
Can you provide us with a rough amount of your list size plus each individual 
dataset size ?
Have a look at execution plan would help, maybe the high amount of join 
operations makes execution plan too complicated at the end of the day ; 
checkpointing could help there ?

Cheers,
David


Le jeu. 30 déc. 2021 à 16:56, Andrew Davidson  a 
écrit :
Hi Gourav

I will give databricks a try.

Each data gets loaded into a data frame.
I select one column from the data frame
I join the column to the  accumulated joins from previous data frames in 

Re: Pyspark debugging best practices

2022-01-03 Thread David Diebold
Hello Andy,

Are you sure you want to perform lots of join operations, and not simple
unions ?
Are you doing inner joins or outer joins ?
Can you provide us with a rough amount of your list size plus each
individual dataset size ?
Have a look at execution plan would help, maybe the high amount of join
operations makes execution plan too complicated at the end of the day ;
checkpointing could help there ?

Cheers,
David


Le jeu. 30 déc. 2021 à 16:56, Andrew Davidson  a
écrit :

> Hi Gourav
>
> I will give databricks a try.
>
> Each data gets loaded into a data frame.
> I select one column from the data frame
> I join the column to the  accumulated joins from previous data frames in
> the list
>
> To debug. I think am gaining to put an action and log statement after each
> join. I do not think it will change the performance. I believe the physical
> plan will be the same how ever hopefully it will shed some light.
>
> At the very least I will know if it making progress or not. And hopefully
> where it is breaking
>
> Happy new year
>
> Andy
>
> On Tue, Dec 28, 2021 at 4:19 AM Gourav Sengupta 
> wrote:
>
>> Hi Andrew,
>>
>> Any chance you might give Databricks a try in GCP?
>>
>> The above transformations look complicated to me, why are you adding
>> dataframes to a list?
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>>
>>
>> On Sun, Dec 26, 2021 at 7:00 PM Andrew Davidson 
>> wrote:
>>
>>> Hi
>>>
>>>
>>>
>>> I am having trouble debugging my driver. It runs correctly on smaller
>>> data set but fails on large ones.  It is very hard to figure out what the
>>> bug is. I suspect it may have something do with the way spark is installed
>>> and configured. I am using google cloud platform dataproc pyspark
>>>
>>>
>>>
>>> The log messages are not helpful. The error message will be something
>>> like
>>> "User application exited with status 1"
>>>
>>>
>>>
>>> And
>>>
>>>
>>>
>>> jsonPayload: {
>>>
>>> class: "server.TThreadPoolServer"
>>>
>>> filename: "hive-server2.log"
>>>
>>> message: "Error occurred during processing of message."
>>>
>>> thread: "HiveServer2-Handler-Pool: Thread-40"
>>>
>>> }
>>>
>>>
>>>
>>> I am able to access the spark history server however it does not capture
>>> anything if the driver crashes. I am unable to figure out how to access
>>> spark web UI.
>>>
>>>
>>>
>>> My driver program looks something like the pseudo code bellow. A long
>>> list of transforms with a single action, (i.e. write) at the end. Adding
>>> log messages is not helpful because of lazy evaluations. I am tempted to
>>> add something like
>>>
>>>
>>>
>>> Logger.warn( “DEBUG df.count():{}”.format( df.count() )” to try and
>>> inline some sort of diagnostic message.
>>>
>>>
>>>
>>> What do you think?
>>>
>>>
>>>
>>> Is there a better way to debug this?
>>>
>>>
>>>
>>> Kind regards
>>>
>>>
>>>
>>> Andy
>>>
>>>
>>>
>>> def run():
>>>
>>> listOfDF = []
>>>
>>> for filePath in listOfFiles:
>>>
>>> df = spark.read.load( filePath, ...)
>>>
>>> listOfDF.append(df)
>>>
>>>
>>>
>>>
>>>
>>> list2OfDF = []
>>>
>>> for df in listOfDF:
>>>
>>> df2 = df.select(  )
>>>
>>> lsit2OfDF.append( df2 )
>>>
>>>
>>>
>>> # will setting list to None free cache?
>>>
>>> # or just driver memory
>>>
>>> listOfDF = None
>>>
>>>
>>>
>>>
>>>
>>> df3 = list2OfDF[0]
>>>
>>>
>>>
>>> for i in range( 1, len(list2OfDF) ):
>>>
>>> df = list2OfDF[i]
>>>
>>> df3 = df3.join(df ...)
>>>
>>>
>>>
>>> # will setting to list to None free cache?
>>>
>>> # or just driver memory
>>>
>>> List2OfDF = None
>>>
>>>
>>>
>>>
>>>
>>> lots of narrow transformations on d3
>>>
>>>
>>>
>>> return df3
>>>
>>>
>>>
>>> def main() :
>>>
>>> df = run()
>>>
>>> df.write()
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>