Hi David

I need to select 1 column from many files and combine them into a single table.

I do not believe union() will work. It appends rows, not columns.

As far as I know join() is the only way to append columns from different data 
frames.

I think you correct that using lazy evaluation over a lot of joins may make the 
execution plan to complicated. To debug I added

logger.warn( “i:{}, num file rows:{} num joined rows:{}”.format(i, df.count(), 
retDF.count()  )

to try and simplify the execution plan.


Once I set spark.sql.autoBroadcastJoinThreshold=-1 my big job started making 
some progress how ever fails after a few files. Resources are maxed out!



I estimated that that the raw data should be < 500 GB. I am running a cluster 
with 2.8 TB that should be more than enough to spark over head



Is spark integrated with the python garbage collector?



I assume createOrReplaceTempView() would cause cache to get flushed as needed?



Kind regards



Andy


    
###############################################################################
    def _loadSalmonReadsTable(self):
        '''
        AEDWIP TODO
        '''
        self.logger.info( "BEGIN" )
        retNumReadsDF = None
        quantSchema = "`Name` STRING, `Length` INT, `EffectiveLength` DOUBLE, 
`TPM` DOUBLE, `NumReads` DOUBLE "
        for i in range( len(self.fileList) ):
            #
            # get NumReads from next salmon quant file
            #
            quantFile = self.fileList[i]
            sampleDF = self.spark.read.load( quantFile, format="csv", sep="\t",
                                     schema=quantSchema, header="true" )
                                    # did not fix bug .repartition(50)

            sampleName = self.sampleNamesList[i]
            sampleDF = sampleDF.select( ["Name", "NumReads"] )\
                            .withColumnRenamed( "NumReads", sampleName )

            sampleDF.createOrReplaceTempView( "sample" )

            self.logger.warn("AEDWIP i:{} sampleName:{} sampleDF.num rows:{} 
num cols:{} num parts:{}"
                             .format(i, sampleName, sampleDF.count(), 
len(sampleDF.columns), sampleDF.rdd.getNumPartitions()))

            #
            # append NumReads to table of reads
            #

            # the sample name must be quoted else column names with a '-'
            # like 1117F-0426-SM-5EGHI will generate an error
            # spark think the '-' is an expression. '_' is also
            # a special char for the sql like operator
            # https://stackoverflow.com/a/63899306/4586180
            sqlStmt = '\t\t\t\t\t\tselect rc.*, `{}` \n\
                            from \n\
                               retNumReadsDF as rc, \n\
                               sample  \n\
                            where \n\
                                rc.Name == sample.Name \n'.format( sampleName )

            self.logger.debug( "sqlStmt:\n{}\n".format( sqlStmt ) )
            if i == 0 :
                retNumReadsDF = sampleDF
            else :
                retNumReadsDF = self.spark.sql( sqlStmt )

            retNumReadsDF.createOrReplaceTempView( "retNumReadsDF" )

            #
            # debug. seems like we do not make progress when we run on training
            # nothing happens, logs do not change, cluster metrics drop 
suggesting no work
            # is being done
            # add an action to try and debug
            # this should not change the physical plan. I.e. we still have the 
same number of shuffles
            # which results in the same number of stage. We are just not 
building up a plan with thousands
            # of stages.
            #
            self.logger.warn("AEDWIP i:{} retNumReadsDF.num rows:{} num cols:{} 
num parts:{}"
                             .format(i, retNumReadsDF.count(), 
len(retNumReadsDF.columns), retNumReadsDF.rdd.getNumPartitions()) )

            #
            # TODO AEDWIP spark analyze chapter 18 debugging joins

            # execution plan should be the same for each join
            #rawCountsSDF.explain()

        self.logger.info( "END\n" )
        return retNumReadsDF


From: David Diebold <davidjdieb...@gmail.com>
Date: Monday, January 3, 2022 at 12:39 AM
To: Andrew Davidson <aedav...@ucsc.edu.invalid>, "user @spark" 
<user@spark.apache.org>
Subject: Re: Pyspark debugging best practices

Hello Andy,

Are you sure you want to perform lots of join operations, and not simple unions 
?
Are you doing inner joins or outer joins ?
Can you provide us with a rough amount of your list size plus each individual 
dataset size ?
Have a look at execution plan would help, maybe the high amount of join 
operations makes execution plan too complicated at the end of the day ; 
checkpointing could help there ?

Cheers,
David


Le jeu. 30 déc. 2021 à 16:56, Andrew Davidson <aedav...@ucsc.edu.invalid> a 
écrit :
Hi Gourav

I will give databricks a try.

Each data gets loaded into a data frame.
I select one column from the data frame
I join the column to the  accumulated joins from previous data frames in the 
list

To debug. I think am gaining to put an action and log statement after each 
join. I do not think it will change the performance. I believe the physical 
plan will be the same how ever hopefully it will shed some light.

At the very least I will know if it making progress or not. And hopefully where 
it is breaking

Happy new year

Andy

On Tue, Dec 28, 2021 at 4:19 AM Gourav Sengupta 
<gourav.sengu...@gmail.com<mailto:gourav.sengu...@gmail.com>> wrote:
Hi Andrew,

Any chance you might give Databricks a try in GCP?

The above transformations look complicated to me, why are you adding dataframes 
to a list?


Regards,
Gourav Sengupta



On Sun, Dec 26, 2021 at 7:00 PM Andrew Davidson <aedav...@ucsc.edu.invalid> 
wrote:
Hi

I am having trouble debugging my driver. It runs correctly on smaller data set 
but fails on large ones.  It is very hard to figure out what the bug is. I 
suspect it may have something do with the way spark is installed and 
configured. I am using google cloud platform dataproc pyspark

The log messages are not helpful. The error message will be something like
"User application exited with status 1"

And

jsonPayload: {
class: "server.TThreadPoolServer"
filename: "hive-server2.log"
message: "Error occurred during processing of message."
thread: "HiveServer2-Handler-Pool: Thread-40"
}

I am able to access the spark history server however it does not capture 
anything if the driver crashes. I am unable to figure out how to access spark 
web UI.

My driver program looks something like the pseudo code bellow. A long list of 
transforms with a single action, (i.e. write) at the end. Adding log messages 
is not helpful because of lazy evaluations. I am tempted to add something like

Logger.warn( “DEBUG df.count():{}”.format( df.count() )” to try and inline some 
sort of diagnostic message.

What do you think?

Is there a better way to debug this?

Kind regards

Andy

def run():
    listOfDF = []
    for filePath in listOfFiles:
        df = spark.read.load( filePath, ...)
        listOfDF.append(df)


    list2OfDF = []
    for df in listOfDF:
        df2 = df.select( .... )
        lsit2OfDF.append( df2 )

    # will setting list to None free cache?
    # or just driver memory
    listOfDF = None


    df3 = list2OfDF[0]

    for i in range( 1, len(list2OfDF) ):
        df = list2OfDF[i]
        df3 = df3.join(df ...)

    # will setting to list to None free cache?
    # or just driver memory
    List2OfDF = None


    lots of narrow transformations on d3

    return df3

def main() :
    df = run()
    df.write()



Reply via email to