Re: Joining many tables Re: Pyspark debugging best practices

2022-01-03 Thread Sonal Goyal
thousands
>
> # of stages.
>
> #
>
>     self.logger.warn("AEDWIP i:{} retNumReadsDF.num rows:{} *num*
> *cols*:{} *num* parts:{}"
>
>  .format(i, retNumReadsDF.count(),
> len(retNumReadsDF.columns), retNumReadsDF.rdd.getNumPartitions()) )
>
>
>
> #
>
> # TODO AEDWIP spark analyze chapter 18 debugging joins
>
>
>
> # execution plan should be the same for each join
>
> #rawCountsSDF.explain()
>
>
>
> self.logger.info( "END\n" )
>
> return retNumReadsDF
>
>
>
>
>
> *From: *David Diebold 
> *Date: *Monday, January 3, 2022 at 12:39 AM
> *To: *Andrew Davidson , "user @spark" <
> user@spark.apache.org>
> *Subject: *Re: Pyspark debugging best practices
>
>
>
> Hello Andy,
>
>
>
> Are you sure you want to perform lots of join operations, and not simple
> unions ?
>
> Are you doing inner joins or outer joins ?
>
> Can you provide us with a rough amount of your list size plus each
> individual dataset size ?
>
> Have a look at execution plan would help, maybe the high amount of join
> operations makes execution plan too complicated at the end of the day ;
> checkpointing could help there ?
>
>
>
> Cheers,
>
> David
>
>
>
>
>
> Le jeu. 30 déc. 2021 à 16:56, Andrew Davidson 
> a écrit :
>
> Hi Gourav
>
>
>
> I will give databricks a try.
>
>
>
> Each data gets loaded into a data frame.
>
> I select one column from the data frame
>
> I join the column to the  accumulated joins from previous data frames in
> the list
>
>
>
> To debug. I think am gaining to put an action and log statement after each
> join. I do not think it will change the performance. I believe the physical
> plan will be the same how ever hopefully it will shed some light.
>
>
>
> At the very least I will know if it making progress or not. And hopefully
> where it is breaking
>
>
>
> Happy new year
>
>
>
> Andy
>
>
>
> On Tue, Dec 28, 2021 at 4:19 AM Gourav Sengupta 
> wrote:
>
> Hi Andrew,
>
>
>
> Any chance you might give Databricks a try in GCP?
>
>
>
> The above transformations look complicated to me, why are you adding
> dataframes to a list?
>
>
>
>
>
> Regards,
>
> Gourav Sengupta
>
>
>
>
>
>
>
> On Sun, Dec 26, 2021 at 7:00 PM Andrew Davidson 
> wrote:
>
> Hi
>
>
>
> I am having trouble debugging my driver. It runs correctly on smaller data
> set but fails on large ones.  It is very hard to figure out what the bug
> is. I suspect it may have something do with the way spark is installed and
> configured. I am using google cloud platform dataproc pyspark
>
>
>
> The log messages are not helpful. The error message will be something like
> "User application exited with status 1"
>
>
>
> And
>
>
>
> jsonPayload: {
>
> class: "server.TThreadPoolServer"
>
> filename: "hive-server2.log"
>
> message: "Error occurred during processing of message."
>
> thread: "HiveServer2-Handler-Pool: Thread-40"
>
> }
>
>
>
> I am able to access the spark history server however it does not capture
> anything if the driver crashes. I am unable to figure out how to access
> spark web UI.
>
>
>
> My driver program looks something like the pseudo code bellow. A long list
> of transforms with a single action, (i.e. write) at the end. Adding log
> messages is not helpful because of lazy evaluations. I am tempted to add
> something like
>
>
>
> Logger.warn( “DEBUG df.count():{}”.format( df.count() )” to try and inline
> some sort of diagnostic message.
>
>
>
> What do you think?
>
>
>
> Is there a better way to debug this?
>
>
>
> Kind regards
>
>
>
> Andy
>
>
>
> def run():
>
> listOfDF = []
>
> for filePath in listOfFiles:
>
> df = spark.read.load( filePath, ...)
>
> listOfDF.append(df)
>
>
>
>
>
> list2OfDF = []
>
> for df in listOfDF:
>
> df2 = df.select(  )
>
> lsit2OfDF.append( df2 )
>
>
>
> # will setting list to None free cache?
>
> # or just driver memory
>
> listOfDF = None
>
>
>
>
>
> df3 = list2OfDF[0]
>
>
>
> for i in range( 1, len(list2OfDF) ):
>
> df = list2OfDF[i]
>
> df3 = df3.join(df ...)
>
>
>
> # will setting to list to None free cache?
>
> # or just driver memory
>
> List2OfDF = None
>
>
>
>
>
> lots of narrow transformations on d3
>
>
>
> return df3
>
>
>
> def main() :
>
> df = run()
>
> df.write()
>
>
>
>
>
>
>
> --
Cheers,
Sonal
https://github.com/zinggAI/zingg


Joining many tables Re: Pyspark debugging best practices

2022-01-03 Thread Andrew Davidson
Hi David

I need to select 1 column from many files and combine them into a single table.

I do not believe union() will work. It appends rows, not columns.

As far as I know join() is the only way to append columns from different data 
frames.

I think you correct that using lazy evaluation over a lot of joins may make the 
execution plan to complicated. To debug I added

logger.warn( “i:{}, num file rows:{} num joined rows:{}”.format(i, df.count(), 
retDF.count()  )

to try and simplify the execution plan.


Once I set spark.sql.autoBroadcastJoinThreshold=-1 my big job started making 
some progress how ever fails after a few files. Resources are maxed out!



I estimated that that the raw data should be < 500 GB. I am running a cluster 
with 2.8 TB that should be more than enough to spark over head



Is spark integrated with the python garbage collector?



I assume createOrReplaceTempView() would cause cache to get flushed as needed?



Kind regards



Andy



###
def _loadSalmonReadsTable(self):
'''
AEDWIP TODO
'''
self.logger.info( "BEGIN" )
retNumReadsDF = None
quantSchema = "`Name` STRING, `Length` INT, `EffectiveLength` DOUBLE, 
`TPM` DOUBLE, `NumReads` DOUBLE "
for i in range( len(self.fileList) ):
#
# get NumReads from next salmon quant file
#
quantFile = self.fileList[i]
sampleDF = self.spark.read.load( quantFile, format="csv", sep="\t",
 schema=quantSchema, header="true" )
# did not fix bug .repartition(50)

sampleName = self.sampleNamesList[i]
sampleDF = sampleDF.select( ["Name", "NumReads"] )\
.withColumnRenamed( "NumReads", sampleName )

sampleDF.createOrReplaceTempView( "sample" )

self.logger.warn("AEDWIP i:{} sampleName:{} sampleDF.num rows:{} 
num cols:{} num parts:{}"
 .format(i, sampleName, sampleDF.count(), 
len(sampleDF.columns), sampleDF.rdd.getNumPartitions()))

#
# append NumReads to table of reads
#

# the sample name must be quoted else column names with a '-'
# like 1117F-0426-SM-5EGHI will generate an error
# spark think the '-' is an expression. '_' is also
# a special char for the sql like operator
# https://stackoverflow.com/a/63899306/4586180
sqlStmt = '\t\t\t\t\t\tselect rc.*, `{}` \n\
from \n\
   retNumReadsDF as rc, \n\
   sample  \n\
where \n\
rc.Name == sample.Name \n'.format( sampleName )

self.logger.debug( "sqlStmt:\n{}\n".format( sqlStmt ) )
if i == 0 :
retNumReadsDF = sampleDF
else :
retNumReadsDF = self.spark.sql( sqlStmt )

retNumReadsDF.createOrReplaceTempView( "retNumReadsDF" )

#
# debug. seems like we do not make progress when we run on training
# nothing happens, logs do not change, cluster metrics drop 
suggesting no work
# is being done
# add an action to try and debug
# this should not change the physical plan. I.e. we still have the 
same number of shuffles
# which results in the same number of stage. We are just not 
building up a plan with thousands
# of stages.
#
self.logger.warn("AEDWIP i:{} retNumReadsDF.num rows:{} num cols:{} 
num parts:{}"
 .format(i, retNumReadsDF.count(), 
len(retNumReadsDF.columns), retNumReadsDF.rdd.getNumPartitions()) )

#
# TODO AEDWIP spark analyze chapter 18 debugging joins

# execution plan should be the same for each join
#rawCountsSDF.explain()

self.logger.info( "END\n" )
    return retNumReadsDF


From: David Diebold 
Date: Monday, January 3, 2022 at 12:39 AM
To: Andrew Davidson , "user @spark" 

Subject: Re: Pyspark debugging best practices

Hello Andy,

Are you sure you want to perform lots of join operations, and not simple unions 
?
Are you doing inner joins or outer joins ?
Can you provide us with a rough amount of your list size plus each individual 
dataset size ?
Have a look at execution plan would help, maybe the high amount of join 
operations makes execution plan too complicated at the end of the day ; 
checkpointing could help there ?

Cheers,
David


Le jeu. 30 déc. 2021 à 16:56, Andrew Davidson  a 
écrit :
Hi Gourav

I will give databricks a try

Re: Pyspark debugging best practices

2022-01-03 Thread David Diebold
Hello Andy,

Are you sure you want to perform lots of join operations, and not simple
unions ?
Are you doing inner joins or outer joins ?
Can you provide us with a rough amount of your list size plus each
individual dataset size ?
Have a look at execution plan would help, maybe the high amount of join
operations makes execution plan too complicated at the end of the day ;
checkpointing could help there ?

Cheers,
David


Le jeu. 30 déc. 2021 à 16:56, Andrew Davidson  a
écrit :

> Hi Gourav
>
> I will give databricks a try.
>
> Each data gets loaded into a data frame.
> I select one column from the data frame
> I join the column to the  accumulated joins from previous data frames in
> the list
>
> To debug. I think am gaining to put an action and log statement after each
> join. I do not think it will change the performance. I believe the physical
> plan will be the same how ever hopefully it will shed some light.
>
> At the very least I will know if it making progress or not. And hopefully
> where it is breaking
>
> Happy new year
>
> Andy
>
> On Tue, Dec 28, 2021 at 4:19 AM Gourav Sengupta 
> wrote:
>
>> Hi Andrew,
>>
>> Any chance you might give Databricks a try in GCP?
>>
>> The above transformations look complicated to me, why are you adding
>> dataframes to a list?
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>>
>>
>> On Sun, Dec 26, 2021 at 7:00 PM Andrew Davidson 
>> wrote:
>>
>>> Hi
>>>
>>>
>>>
>>> I am having trouble debugging my driver. It runs correctly on smaller
>>> data set but fails on large ones.  It is very hard to figure out what the
>>> bug is. I suspect it may have something do with the way spark is installed
>>> and configured. I am using google cloud platform dataproc pyspark
>>>
>>>
>>>
>>> The log messages are not helpful. The error message will be something
>>> like
>>> "User application exited with status 1"
>>>
>>>
>>>
>>> And
>>>
>>>
>>>
>>> jsonPayload: {
>>>
>>> class: "server.TThreadPoolServer"
>>>
>>> filename: "hive-server2.log"
>>>
>>> message: "Error occurred during processing of message."
>>>
>>> thread: "HiveServer2-Handler-Pool: Thread-40"
>>>
>>> }
>>>
>>>
>>>
>>> I am able to access the spark history server however it does not capture
>>> anything if the driver crashes. I am unable to figure out how to access
>>> spark web UI.
>>>
>>>
>>>
>>> My driver program looks something like the pseudo code bellow. A long
>>> list of transforms with a single action, (i.e. write) at the end. Adding
>>> log messages is not helpful because of lazy evaluations. I am tempted to
>>> add something like
>>>
>>>
>>>
>>> Logger.warn( “DEBUG df.count():{}”.format( df.count() )” to try and
>>> inline some sort of diagnostic message.
>>>
>>>
>>>
>>> What do you think?
>>>
>>>
>>>
>>> Is there a better way to debug this?
>>>
>>>
>>>
>>> Kind regards
>>>
>>>
>>>
>>> Andy
>>>
>>>
>>>
>>> def run():
>>>
>>> listOfDF = []
>>>
>>> for filePath in listOfFiles:
>>>
>>> df = spark.read.load( filePath, ...)
>>>
>>> listOfDF.append(df)
>>>
>>>
>>>
>>>
>>>
>>> list2OfDF = []
>>>
>>> for df in listOfDF:
>>>
>>> df2 = df.select(  )
>>>
>>> lsit2OfDF.append( df2 )
>>>
>>>
>>>
>>> # will setting list to None free cache?
>>>
>>> # or just driver memory
>>>
>>> listOfDF = None
>>>
>>>
>>>
>>>
>>>
>>> df3 = list2OfDF[0]
>>>
>>>
>>>
>>> for i in range( 1, len(list2OfDF) ):
>>>
>>> df = list2OfDF[i]
>>>
>>> df3 = df3.join(df ...)
>>>
>>>
>>>
>>> # will setting to list to None free cache?
>>>
>>> # or just driver memory
>>>
>>> List2OfDF = None
>>>
>>>
>>>
>>>
>>>
>>> lots of narrow transformations on d3
>>>
>>>
>>>
>>> return df3
>>>
>>>
>>>
>>> def main() :
>>>
>>> df = run()
>>>
>>> df.write()
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>


Re: Pyspark debugging best practices

2021-12-30 Thread Andrew Davidson
Hi Gourav

I will give databricks a try.

Each data gets loaded into a data frame.
I select one column from the data frame
I join the column to the  accumulated joins from previous data frames in
the list

To debug. I think am gaining to put an action and log statement after each
join. I do not think it will change the performance. I believe the physical
plan will be the same how ever hopefully it will shed some light.

At the very least I will know if it making progress or not. And hopefully
where it is breaking

Happy new year

Andy

On Tue, Dec 28, 2021 at 4:19 AM Gourav Sengupta 
wrote:

> Hi Andrew,
>
> Any chance you might give Databricks a try in GCP?
>
> The above transformations look complicated to me, why are you adding
> dataframes to a list?
>
>
> Regards,
> Gourav Sengupta
>
>
>
> On Sun, Dec 26, 2021 at 7:00 PM Andrew Davidson 
> wrote:
>
>> Hi
>>
>>
>>
>> I am having trouble debugging my driver. It runs correctly on smaller
>> data set but fails on large ones.  It is very hard to figure out what the
>> bug is. I suspect it may have something do with the way spark is installed
>> and configured. I am using google cloud platform dataproc pyspark
>>
>>
>>
>> The log messages are not helpful. The error message will be something
>> like
>> "User application exited with status 1"
>>
>>
>>
>> And
>>
>>
>>
>> jsonPayload: {
>>
>> class: "server.TThreadPoolServer"
>>
>> filename: "hive-server2.log"
>>
>> message: "Error occurred during processing of message."
>>
>> thread: "HiveServer2-Handler-Pool: Thread-40"
>>
>> }
>>
>>
>>
>> I am able to access the spark history server however it does not capture
>> anything if the driver crashes. I am unable to figure out how to access
>> spark web UI.
>>
>>
>>
>> My driver program looks something like the pseudo code bellow. A long
>> list of transforms with a single action, (i.e. write) at the end. Adding
>> log messages is not helpful because of lazy evaluations. I am tempted to
>> add something like
>>
>>
>>
>> Logger.warn( “DEBUG df.count():{}”.format( df.count() )” to try and
>> inline some sort of diagnostic message.
>>
>>
>>
>> What do you think?
>>
>>
>>
>> Is there a better way to debug this?
>>
>>
>>
>> Kind regards
>>
>>
>>
>> Andy
>>
>>
>>
>> def run():
>>
>> listOfDF = []
>>
>> for filePath in listOfFiles:
>>
>> df = spark.read.load( filePath, ...)
>>
>> listOfDF.append(df)
>>
>>
>>
>>
>>
>> list2OfDF = []
>>
>> for df in listOfDF:
>>
>> df2 = df.select(  )
>>
>> lsit2OfDF.append( df2 )
>>
>>
>>
>> # will setting list to None free cache?
>>
>> # or just driver memory
>>
>> listOfDF = None
>>
>>
>>
>>
>>
>> df3 = list2OfDF[0]
>>
>>
>>
>> for i in range( 1, len(list2OfDF) ):
>>
>> df = list2OfDF[i]
>>
>> df3 = df3.join(df ...)
>>
>>
>>
>> # will setting to list to None free cache?
>>
>> # or just driver memory
>>
>> List2OfDF = None
>>
>>
>>
>>
>>
>> lots of narrow transformations on d3
>>
>>
>>
>> return df3
>>
>>
>>
>> def main() :
>>
>> df = run()
>>
>> df.write()
>>
>>
>>
>>
>>
>>
>>
>


Re: Pyspark debugging best practices

2021-12-28 Thread Gourav Sengupta
Hi Andrew,

Any chance you might give Databricks a try in GCP?

The above transformations look complicated to me, why are you adding
dataframes to a list?


Regards,
Gourav Sengupta



On Sun, Dec 26, 2021 at 7:00 PM Andrew Davidson 
wrote:

> Hi
>
>
>
> I am having trouble debugging my driver. It runs correctly on smaller data
> set but fails on large ones.  It is very hard to figure out what the bug
> is. I suspect it may have something do with the way spark is installed and
> configured. I am using google cloud platform dataproc pyspark
>
>
>
> The log messages are not helpful. The error message will be something like
> "User application exited with status 1"
>
>
>
> And
>
>
>
> jsonPayload: {
>
> class: "server.TThreadPoolServer"
>
> filename: "hive-server2.log"
>
> message: "Error occurred during processing of message."
>
> thread: "HiveServer2-Handler-Pool: Thread-40"
>
> }
>
>
>
> I am able to access the spark history server however it does not capture
> anything if the driver crashes. I am unable to figure out how to access
> spark web UI.
>
>
>
> My driver program looks something like the pseudo code bellow. A long list
> of transforms with a single action, (i.e. write) at the end. Adding log
> messages is not helpful because of lazy evaluations. I am tempted to add
> something like
>
>
>
> Logger.warn( “DEBUG df.count():{}”.format( df.count() )” to try and inline
> some sort of diagnostic message.
>
>
>
> What do you think?
>
>
>
> Is there a better way to debug this?
>
>
>
> Kind regards
>
>
>
> Andy
>
>
>
> def run():
>
> listOfDF = []
>
> for filePath in listOfFiles:
>
> df = spark.read.load( filePath, ...)
>
> listOfDF.append(df)
>
>
>
>
>
> list2OfDF = []
>
> for df in listOfDF:
>
> df2 = df.select(  )
>
> lsit2OfDF.append( df2 )
>
>
>
> # will setting list to None free cache?
>
> # or just driver memory
>
> listOfDF = None
>
>
>
>
>
> df3 = list2OfDF[0]
>
>
>
> for i in range( 1, len(list2OfDF) ):
>
> df = list2OfDF[i]
>
> df3 = df3.join(df ...)
>
>
>
> # will setting to list to None free cache?
>
> # or just driver memory
>
> List2OfDF = None
>
>
>
>
>
> lots of narrow transformations on d3
>
>
>
> return df3
>
>
>
> def main() :
>
> df = run()
>
> df.write()
>
>
>
>
>
>
>


Pyspark debugging best practices

2021-12-26 Thread Andrew Davidson
Hi

I am having trouble debugging my driver. It runs correctly on smaller data set 
but fails on large ones.  It is very hard to figure out what the bug is. I 
suspect it may have something do with the way spark is installed and 
configured. I am using google cloud platform dataproc pyspark

The log messages are not helpful. The error message will be something like
"User application exited with status 1"

And

jsonPayload: {
class: "server.TThreadPoolServer"
filename: "hive-server2.log"
message: "Error occurred during processing of message."
thread: "HiveServer2-Handler-Pool: Thread-40"
}

I am able to access the spark history server however it does not capture 
anything if the driver crashes. I am unable to figure out how to access spark 
web UI.

My driver program looks something like the pseudo code bellow. A long list of 
transforms with a single action, (i.e. write) at the end. Adding log messages 
is not helpful because of lazy evaluations. I am tempted to add something like

Logger.warn( “DEBUG df.count():{}”.format( df.count() )” to try and inline some 
sort of diagnostic message.

What do you think?

Is there a better way to debug this?

Kind regards

Andy

def run():
listOfDF = []
for filePath in listOfFiles:
df = spark.read.load( filePath, ...)
listOfDF.append(df)


list2OfDF = []
for df in listOfDF:
df2 = df.select(  )
lsit2OfDF.append( df2 )

# will setting list to None free cache?
# or just driver memory
listOfDF = None


df3 = list2OfDF[0]

for i in range( 1, len(list2OfDF) ):
df = list2OfDF[i]
df3 = df3.join(df ...)

# will setting to list to None free cache?
# or just driver memory
List2OfDF = None


lots of narrow transformations on d3

return df3

def main() :
df = run()
df.write()