Re: Joining many tables Re: Pyspark debugging best practices
thousands > > # of stages. > > # > > self.logger.warn("AEDWIP i:{} retNumReadsDF.num rows:{} *num* > *cols*:{} *num* parts:{}" > > .format(i, retNumReadsDF.count(), > len(retNumReadsDF.columns), retNumReadsDF.rdd.getNumPartitions()) ) > > > > # > > # TODO AEDWIP spark analyze chapter 18 debugging joins > > > > # execution plan should be the same for each join > > #rawCountsSDF.explain() > > > > self.logger.info( "END\n" ) > > return retNumReadsDF > > > > > > *From: *David Diebold > *Date: *Monday, January 3, 2022 at 12:39 AM > *To: *Andrew Davidson , "user @spark" < > user@spark.apache.org> > *Subject: *Re: Pyspark debugging best practices > > > > Hello Andy, > > > > Are you sure you want to perform lots of join operations, and not simple > unions ? > > Are you doing inner joins or outer joins ? > > Can you provide us with a rough amount of your list size plus each > individual dataset size ? > > Have a look at execution plan would help, maybe the high amount of join > operations makes execution plan too complicated at the end of the day ; > checkpointing could help there ? > > > > Cheers, > > David > > > > > > Le jeu. 30 déc. 2021 à 16:56, Andrew Davidson > a écrit : > > Hi Gourav > > > > I will give databricks a try. > > > > Each data gets loaded into a data frame. > > I select one column from the data frame > > I join the column to the accumulated joins from previous data frames in > the list > > > > To debug. I think am gaining to put an action and log statement after each > join. I do not think it will change the performance. I believe the physical > plan will be the same how ever hopefully it will shed some light. > > > > At the very least I will know if it making progress or not. And hopefully > where it is breaking > > > > Happy new year > > > > Andy > > > > On Tue, Dec 28, 2021 at 4:19 AM Gourav Sengupta > wrote: > > Hi Andrew, > > > > Any chance you might give Databricks a try in GCP? > > > > The above transformations look complicated to me, why are you adding > dataframes to a list? > > > > > > Regards, > > Gourav Sengupta > > > > > > > > On Sun, Dec 26, 2021 at 7:00 PM Andrew Davidson > wrote: > > Hi > > > > I am having trouble debugging my driver. It runs correctly on smaller data > set but fails on large ones. It is very hard to figure out what the bug > is. I suspect it may have something do with the way spark is installed and > configured. I am using google cloud platform dataproc pyspark > > > > The log messages are not helpful. The error message will be something like > "User application exited with status 1" > > > > And > > > > jsonPayload: { > > class: "server.TThreadPoolServer" > > filename: "hive-server2.log" > > message: "Error occurred during processing of message." > > thread: "HiveServer2-Handler-Pool: Thread-40" > > } > > > > I am able to access the spark history server however it does not capture > anything if the driver crashes. I am unable to figure out how to access > spark web UI. > > > > My driver program looks something like the pseudo code bellow. A long list > of transforms with a single action, (i.e. write) at the end. Adding log > messages is not helpful because of lazy evaluations. I am tempted to add > something like > > > > Logger.warn( “DEBUG df.count():{}”.format( df.count() )” to try and inline > some sort of diagnostic message. > > > > What do you think? > > > > Is there a better way to debug this? > > > > Kind regards > > > > Andy > > > > def run(): > > listOfDF = [] > > for filePath in listOfFiles: > > df = spark.read.load( filePath, ...) > > listOfDF.append(df) > > > > > > list2OfDF = [] > > for df in listOfDF: > > df2 = df.select( ) > > lsit2OfDF.append( df2 ) > > > > # will setting list to None free cache? > > # or just driver memory > > listOfDF = None > > > > > > df3 = list2OfDF[0] > > > > for i in range( 1, len(list2OfDF) ): > > df = list2OfDF[i] > > df3 = df3.join(df ...) > > > > # will setting to list to None free cache? > > # or just driver memory > > List2OfDF = None > > > > > > lots of narrow transformations on d3 > > > > return df3 > > > > def main() : > > df = run() > > df.write() > > > > > > > > -- Cheers, Sonal https://github.com/zinggAI/zingg
Joining many tables Re: Pyspark debugging best practices
Hi David I need to select 1 column from many files and combine them into a single table. I do not believe union() will work. It appends rows, not columns. As far as I know join() is the only way to append columns from different data frames. I think you correct that using lazy evaluation over a lot of joins may make the execution plan to complicated. To debug I added logger.warn( “i:{}, num file rows:{} num joined rows:{}”.format(i, df.count(), retDF.count() ) to try and simplify the execution plan. Once I set spark.sql.autoBroadcastJoinThreshold=-1 my big job started making some progress how ever fails after a few files. Resources are maxed out! I estimated that that the raw data should be < 500 GB. I am running a cluster with 2.8 TB that should be more than enough to spark over head Is spark integrated with the python garbage collector? I assume createOrReplaceTempView() would cause cache to get flushed as needed? Kind regards Andy ### def _loadSalmonReadsTable(self): ''' AEDWIP TODO ''' self.logger.info( "BEGIN" ) retNumReadsDF = None quantSchema = "`Name` STRING, `Length` INT, `EffectiveLength` DOUBLE, `TPM` DOUBLE, `NumReads` DOUBLE " for i in range( len(self.fileList) ): # # get NumReads from next salmon quant file # quantFile = self.fileList[i] sampleDF = self.spark.read.load( quantFile, format="csv", sep="\t", schema=quantSchema, header="true" ) # did not fix bug .repartition(50) sampleName = self.sampleNamesList[i] sampleDF = sampleDF.select( ["Name", "NumReads"] )\ .withColumnRenamed( "NumReads", sampleName ) sampleDF.createOrReplaceTempView( "sample" ) self.logger.warn("AEDWIP i:{} sampleName:{} sampleDF.num rows:{} num cols:{} num parts:{}" .format(i, sampleName, sampleDF.count(), len(sampleDF.columns), sampleDF.rdd.getNumPartitions())) # # append NumReads to table of reads # # the sample name must be quoted else column names with a '-' # like 1117F-0426-SM-5EGHI will generate an error # spark think the '-' is an expression. '_' is also # a special char for the sql like operator # https://stackoverflow.com/a/63899306/4586180 sqlStmt = '\t\t\t\t\t\tselect rc.*, `{}` \n\ from \n\ retNumReadsDF as rc, \n\ sample \n\ where \n\ rc.Name == sample.Name \n'.format( sampleName ) self.logger.debug( "sqlStmt:\n{}\n".format( sqlStmt ) ) if i == 0 : retNumReadsDF = sampleDF else : retNumReadsDF = self.spark.sql( sqlStmt ) retNumReadsDF.createOrReplaceTempView( "retNumReadsDF" ) # # debug. seems like we do not make progress when we run on training # nothing happens, logs do not change, cluster metrics drop suggesting no work # is being done # add an action to try and debug # this should not change the physical plan. I.e. we still have the same number of shuffles # which results in the same number of stage. We are just not building up a plan with thousands # of stages. # self.logger.warn("AEDWIP i:{} retNumReadsDF.num rows:{} num cols:{} num parts:{}" .format(i, retNumReadsDF.count(), len(retNumReadsDF.columns), retNumReadsDF.rdd.getNumPartitions()) ) # # TODO AEDWIP spark analyze chapter 18 debugging joins # execution plan should be the same for each join #rawCountsSDF.explain() self.logger.info( "END\n" ) return retNumReadsDF From: David Diebold Date: Monday, January 3, 2022 at 12:39 AM To: Andrew Davidson , "user @spark" Subject: Re: Pyspark debugging best practices Hello Andy, Are you sure you want to perform lots of join operations, and not simple unions ? Are you doing inner joins or outer joins ? Can you provide us with a rough amount of your list size plus each individual dataset size ? Have a look at execution plan would help, maybe the high amount of join operations makes execution plan too complicated at the end of the day ; checkpointing could help there ? Cheers, David Le jeu. 30 déc. 2021 à 16:56, Andrew Davidson a écrit : Hi Gourav I will give databricks a try
Re: Pyspark debugging best practices
Hello Andy, Are you sure you want to perform lots of join operations, and not simple unions ? Are you doing inner joins or outer joins ? Can you provide us with a rough amount of your list size plus each individual dataset size ? Have a look at execution plan would help, maybe the high amount of join operations makes execution plan too complicated at the end of the day ; checkpointing could help there ? Cheers, David Le jeu. 30 déc. 2021 à 16:56, Andrew Davidson a écrit : > Hi Gourav > > I will give databricks a try. > > Each data gets loaded into a data frame. > I select one column from the data frame > I join the column to the accumulated joins from previous data frames in > the list > > To debug. I think am gaining to put an action and log statement after each > join. I do not think it will change the performance. I believe the physical > plan will be the same how ever hopefully it will shed some light. > > At the very least I will know if it making progress or not. And hopefully > where it is breaking > > Happy new year > > Andy > > On Tue, Dec 28, 2021 at 4:19 AM Gourav Sengupta > wrote: > >> Hi Andrew, >> >> Any chance you might give Databricks a try in GCP? >> >> The above transformations look complicated to me, why are you adding >> dataframes to a list? >> >> >> Regards, >> Gourav Sengupta >> >> >> >> On Sun, Dec 26, 2021 at 7:00 PM Andrew Davidson >> wrote: >> >>> Hi >>> >>> >>> >>> I am having trouble debugging my driver. It runs correctly on smaller >>> data set but fails on large ones. It is very hard to figure out what the >>> bug is. I suspect it may have something do with the way spark is installed >>> and configured. I am using google cloud platform dataproc pyspark >>> >>> >>> >>> The log messages are not helpful. The error message will be something >>> like >>> "User application exited with status 1" >>> >>> >>> >>> And >>> >>> >>> >>> jsonPayload: { >>> >>> class: "server.TThreadPoolServer" >>> >>> filename: "hive-server2.log" >>> >>> message: "Error occurred during processing of message." >>> >>> thread: "HiveServer2-Handler-Pool: Thread-40" >>> >>> } >>> >>> >>> >>> I am able to access the spark history server however it does not capture >>> anything if the driver crashes. I am unable to figure out how to access >>> spark web UI. >>> >>> >>> >>> My driver program looks something like the pseudo code bellow. A long >>> list of transforms with a single action, (i.e. write) at the end. Adding >>> log messages is not helpful because of lazy evaluations. I am tempted to >>> add something like >>> >>> >>> >>> Logger.warn( “DEBUG df.count():{}”.format( df.count() )” to try and >>> inline some sort of diagnostic message. >>> >>> >>> >>> What do you think? >>> >>> >>> >>> Is there a better way to debug this? >>> >>> >>> >>> Kind regards >>> >>> >>> >>> Andy >>> >>> >>> >>> def run(): >>> >>> listOfDF = [] >>> >>> for filePath in listOfFiles: >>> >>> df = spark.read.load( filePath, ...) >>> >>> listOfDF.append(df) >>> >>> >>> >>> >>> >>> list2OfDF = [] >>> >>> for df in listOfDF: >>> >>> df2 = df.select( ) >>> >>> lsit2OfDF.append( df2 ) >>> >>> >>> >>> # will setting list to None free cache? >>> >>> # or just driver memory >>> >>> listOfDF = None >>> >>> >>> >>> >>> >>> df3 = list2OfDF[0] >>> >>> >>> >>> for i in range( 1, len(list2OfDF) ): >>> >>> df = list2OfDF[i] >>> >>> df3 = df3.join(df ...) >>> >>> >>> >>> # will setting to list to None free cache? >>> >>> # or just driver memory >>> >>> List2OfDF = None >>> >>> >>> >>> >>> >>> lots of narrow transformations on d3 >>> >>> >>> >>> return df3 >>> >>> >>> >>> def main() : >>> >>> df = run() >>> >>> df.write() >>> >>> >>> >>> >>> >>> >>> >>
Re: Pyspark debugging best practices
Hi Gourav I will give databricks a try. Each data gets loaded into a data frame. I select one column from the data frame I join the column to the accumulated joins from previous data frames in the list To debug. I think am gaining to put an action and log statement after each join. I do not think it will change the performance. I believe the physical plan will be the same how ever hopefully it will shed some light. At the very least I will know if it making progress or not. And hopefully where it is breaking Happy new year Andy On Tue, Dec 28, 2021 at 4:19 AM Gourav Sengupta wrote: > Hi Andrew, > > Any chance you might give Databricks a try in GCP? > > The above transformations look complicated to me, why are you adding > dataframes to a list? > > > Regards, > Gourav Sengupta > > > > On Sun, Dec 26, 2021 at 7:00 PM Andrew Davidson > wrote: > >> Hi >> >> >> >> I am having trouble debugging my driver. It runs correctly on smaller >> data set but fails on large ones. It is very hard to figure out what the >> bug is. I suspect it may have something do with the way spark is installed >> and configured. I am using google cloud platform dataproc pyspark >> >> >> >> The log messages are not helpful. The error message will be something >> like >> "User application exited with status 1" >> >> >> >> And >> >> >> >> jsonPayload: { >> >> class: "server.TThreadPoolServer" >> >> filename: "hive-server2.log" >> >> message: "Error occurred during processing of message." >> >> thread: "HiveServer2-Handler-Pool: Thread-40" >> >> } >> >> >> >> I am able to access the spark history server however it does not capture >> anything if the driver crashes. I am unable to figure out how to access >> spark web UI. >> >> >> >> My driver program looks something like the pseudo code bellow. A long >> list of transforms with a single action, (i.e. write) at the end. Adding >> log messages is not helpful because of lazy evaluations. I am tempted to >> add something like >> >> >> >> Logger.warn( “DEBUG df.count():{}”.format( df.count() )” to try and >> inline some sort of diagnostic message. >> >> >> >> What do you think? >> >> >> >> Is there a better way to debug this? >> >> >> >> Kind regards >> >> >> >> Andy >> >> >> >> def run(): >> >> listOfDF = [] >> >> for filePath in listOfFiles: >> >> df = spark.read.load( filePath, ...) >> >> listOfDF.append(df) >> >> >> >> >> >> list2OfDF = [] >> >> for df in listOfDF: >> >> df2 = df.select( ) >> >> lsit2OfDF.append( df2 ) >> >> >> >> # will setting list to None free cache? >> >> # or just driver memory >> >> listOfDF = None >> >> >> >> >> >> df3 = list2OfDF[0] >> >> >> >> for i in range( 1, len(list2OfDF) ): >> >> df = list2OfDF[i] >> >> df3 = df3.join(df ...) >> >> >> >> # will setting to list to None free cache? >> >> # or just driver memory >> >> List2OfDF = None >> >> >> >> >> >> lots of narrow transformations on d3 >> >> >> >> return df3 >> >> >> >> def main() : >> >> df = run() >> >> df.write() >> >> >> >> >> >> >> >
Re: Pyspark debugging best practices
Hi Andrew, Any chance you might give Databricks a try in GCP? The above transformations look complicated to me, why are you adding dataframes to a list? Regards, Gourav Sengupta On Sun, Dec 26, 2021 at 7:00 PM Andrew Davidson wrote: > Hi > > > > I am having trouble debugging my driver. It runs correctly on smaller data > set but fails on large ones. It is very hard to figure out what the bug > is. I suspect it may have something do with the way spark is installed and > configured. I am using google cloud platform dataproc pyspark > > > > The log messages are not helpful. The error message will be something like > "User application exited with status 1" > > > > And > > > > jsonPayload: { > > class: "server.TThreadPoolServer" > > filename: "hive-server2.log" > > message: "Error occurred during processing of message." > > thread: "HiveServer2-Handler-Pool: Thread-40" > > } > > > > I am able to access the spark history server however it does not capture > anything if the driver crashes. I am unable to figure out how to access > spark web UI. > > > > My driver program looks something like the pseudo code bellow. A long list > of transforms with a single action, (i.e. write) at the end. Adding log > messages is not helpful because of lazy evaluations. I am tempted to add > something like > > > > Logger.warn( “DEBUG df.count():{}”.format( df.count() )” to try and inline > some sort of diagnostic message. > > > > What do you think? > > > > Is there a better way to debug this? > > > > Kind regards > > > > Andy > > > > def run(): > > listOfDF = [] > > for filePath in listOfFiles: > > df = spark.read.load( filePath, ...) > > listOfDF.append(df) > > > > > > list2OfDF = [] > > for df in listOfDF: > > df2 = df.select( ) > > lsit2OfDF.append( df2 ) > > > > # will setting list to None free cache? > > # or just driver memory > > listOfDF = None > > > > > > df3 = list2OfDF[0] > > > > for i in range( 1, len(list2OfDF) ): > > df = list2OfDF[i] > > df3 = df3.join(df ...) > > > > # will setting to list to None free cache? > > # or just driver memory > > List2OfDF = None > > > > > > lots of narrow transformations on d3 > > > > return df3 > > > > def main() : > > df = run() > > df.write() > > > > > > >
Pyspark debugging best practices
Hi I am having trouble debugging my driver. It runs correctly on smaller data set but fails on large ones. It is very hard to figure out what the bug is. I suspect it may have something do with the way spark is installed and configured. I am using google cloud platform dataproc pyspark The log messages are not helpful. The error message will be something like "User application exited with status 1" And jsonPayload: { class: "server.TThreadPoolServer" filename: "hive-server2.log" message: "Error occurred during processing of message." thread: "HiveServer2-Handler-Pool: Thread-40" } I am able to access the spark history server however it does not capture anything if the driver crashes. I am unable to figure out how to access spark web UI. My driver program looks something like the pseudo code bellow. A long list of transforms with a single action, (i.e. write) at the end. Adding log messages is not helpful because of lazy evaluations. I am tempted to add something like Logger.warn( “DEBUG df.count():{}”.format( df.count() )” to try and inline some sort of diagnostic message. What do you think? Is there a better way to debug this? Kind regards Andy def run(): listOfDF = [] for filePath in listOfFiles: df = spark.read.load( filePath, ...) listOfDF.append(df) list2OfDF = [] for df in listOfDF: df2 = df.select( ) lsit2OfDF.append( df2 ) # will setting list to None free cache? # or just driver memory listOfDF = None df3 = list2OfDF[0] for i in range( 1, len(list2OfDF) ): df = list2OfDF[i] df3 = df3.join(df ...) # will setting to list to None free cache? # or just driver memory List2OfDF = None lots of narrow transformations on d3 return df3 def main() : df = run() df.write()