Hi David
I need to select 1 column from many files and combine them into a single table.
I do not believe union() will work. It appends rows, not columns.
As far as I know join() is the only way to append columns from different data
frames.
I think you correct that using lazy evaluation over a lot of joins may make the
execution plan to complicated. To debug I added
logger.warn( “i:{}, num file rows:{} num joined rows:{}”.format(i, df.count(),
retDF.count() )
to try and simplify the execution plan.
Once I set spark.sql.autoBroadcastJoinThreshold=-1 my big job started making
some progress how ever fails after a few files. Resources are maxed out!
I estimated that that the raw data should be < 500 GB. I am running a cluster
with 2.8 TB that should be more than enough to spark over head
Is spark integrated with the python garbage collector?
I assume createOrReplaceTempView() would cause cache to get flushed as needed?
Kind regards
Andy
###############################################################################
def _loadSalmonReadsTable(self):
'''
AEDWIP TODO
'''
self.logger.info( "BEGIN" )
retNumReadsDF = None
quantSchema = "`Name` STRING, `Length` INT, `EffectiveLength` DOUBLE,
`TPM` DOUBLE, `NumReads` DOUBLE "
for i in range( len(self.fileList) ):
#
# get NumReads from next salmon quant file
#
quantFile = self.fileList[i]
sampleDF = self.spark.read.load( quantFile, format="csv", sep="\t",
schema=quantSchema, header="true" )
# did not fix bug .repartition(50)
sampleName = self.sampleNamesList[i]
sampleDF = sampleDF.select( ["Name", "NumReads"] )\
.withColumnRenamed( "NumReads", sampleName )
sampleDF.createOrReplaceTempView( "sample" )
self.logger.warn("AEDWIP i:{} sampleName:{} sampleDF.num rows:{}
num cols:{} num parts:{}"
.format(i, sampleName, sampleDF.count(),
len(sampleDF.columns), sampleDF.rdd.getNumPartitions()))
#
# append NumReads to table of reads
#
# the sample name must be quoted else column names with a '-'
# like 1117F-0426-SM-5EGHI will generate an error
# spark think the '-' is an expression. '_' is also
# a special char for the sql like operator
# https://stackoverflow.com/a/63899306/4586180
sqlStmt = '\t\t\t\t\t\tselect rc.*, `{}` \n\
from \n\
retNumReadsDF as rc, \n\
sample \n\
where \n\
rc.Name == sample.Name \n'.format( sampleName )
self.logger.debug( "sqlStmt:\n{}\n".format( sqlStmt ) )
if i == 0 :
retNumReadsDF = sampleDF
else :
retNumReadsDF = self.spark.sql( sqlStmt )
retNumReadsDF.createOrReplaceTempView( "retNumReadsDF" )
#
# debug. seems like we do not make progress when we run on training
# nothing happens, logs do not change, cluster metrics drop
suggesting no work
# is being done
# add an action to try and debug
# this should not change the physical plan. I.e. we still have the
same number of shuffles
# which results in the same number of stage. We are just not
building up a plan with thousands
# of stages.
#
self.logger.warn("AEDWIP i:{} retNumReadsDF.num rows:{} num cols:{}
num parts:{}"
.format(i, retNumReadsDF.count(),
len(retNumReadsDF.columns), retNumReadsDF.rdd.getNumPartitions()) )
#
# TODO AEDWIP spark analyze chapter 18 debugging joins
# execution plan should be the same for each join
#rawCountsSDF.explain()
self.logger.info( "END\n" )
return retNumReadsDF
From: David Diebold <[email protected]>
Date: Monday, January 3, 2022 at 12:39 AM
To: Andrew Davidson <[email protected]>, "user @spark"
<[email protected]>
Subject: Re: Pyspark debugging best practices
Hello Andy,
Are you sure you want to perform lots of join operations, and not simple unions
?
Are you doing inner joins or outer joins ?
Can you provide us with a rough amount of your list size plus each individual
dataset size ?
Have a look at execution plan would help, maybe the high amount of join
operations makes execution plan too complicated at the end of the day ;
checkpointing could help there ?
Cheers,
David
Le jeu. 30 déc. 2021 à 16:56, Andrew Davidson <[email protected]> a
écrit :
Hi Gourav
I will give databricks a try.
Each data gets loaded into a data frame.
I select one column from the data frame
I join the column to the accumulated joins from previous data frames in the
list
To debug. I think am gaining to put an action and log statement after each
join. I do not think it will change the performance. I believe the physical
plan will be the same how ever hopefully it will shed some light.
At the very least I will know if it making progress or not. And hopefully where
it is breaking
Happy new year
Andy
On Tue, Dec 28, 2021 at 4:19 AM Gourav Sengupta
<[email protected]<mailto:[email protected]>> wrote:
Hi Andrew,
Any chance you might give Databricks a try in GCP?
The above transformations look complicated to me, why are you adding dataframes
to a list?
Regards,
Gourav Sengupta
On Sun, Dec 26, 2021 at 7:00 PM Andrew Davidson <[email protected]>
wrote:
Hi
I am having trouble debugging my driver. It runs correctly on smaller data set
but fails on large ones. It is very hard to figure out what the bug is. I
suspect it may have something do with the way spark is installed and
configured. I am using google cloud platform dataproc pyspark
The log messages are not helpful. The error message will be something like
"User application exited with status 1"
And
jsonPayload: {
class: "server.TThreadPoolServer"
filename: "hive-server2.log"
message: "Error occurred during processing of message."
thread: "HiveServer2-Handler-Pool: Thread-40"
}
I am able to access the spark history server however it does not capture
anything if the driver crashes. I am unable to figure out how to access spark
web UI.
My driver program looks something like the pseudo code bellow. A long list of
transforms with a single action, (i.e. write) at the end. Adding log messages
is not helpful because of lazy evaluations. I am tempted to add something like
Logger.warn( “DEBUG df.count():{}”.format( df.count() )” to try and inline some
sort of diagnostic message.
What do you think?
Is there a better way to debug this?
Kind regards
Andy
def run():
listOfDF = []
for filePath in listOfFiles:
df = spark.read.load( filePath, ...)
listOfDF.append(df)
list2OfDF = []
for df in listOfDF:
df2 = df.select( .... )
lsit2OfDF.append( df2 )
# will setting list to None free cache?
# or just driver memory
listOfDF = None
df3 = list2OfDF[0]
for i in range( 1, len(list2OfDF) ):
df = list2OfDF[i]
df3 = df3.join(df ...)
# will setting to list to None free cache?
# or just driver memory
List2OfDF = None
lots of narrow transformations on d3
return df3
def main() :
df = run()
df.write()