Hi David I need to select 1 column from many files and combine them into a single table.
I do not believe union() will work. It appends rows, not columns. As far as I know join() is the only way to append columns from different data frames. I think you correct that using lazy evaluation over a lot of joins may make the execution plan to complicated. To debug I added logger.warn( “i:{}, num file rows:{} num joined rows:{}”.format(i, df.count(), retDF.count() ) to try and simplify the execution plan. Once I set spark.sql.autoBroadcastJoinThreshold=-1 my big job started making some progress how ever fails after a few files. Resources are maxed out! I estimated that that the raw data should be < 500 GB. I am running a cluster with 2.8 TB that should be more than enough to spark over head Is spark integrated with the python garbage collector? I assume createOrReplaceTempView() would cause cache to get flushed as needed? Kind regards Andy ############################################################################### def _loadSalmonReadsTable(self): ''' AEDWIP TODO ''' self.logger.info( "BEGIN" ) retNumReadsDF = None quantSchema = "`Name` STRING, `Length` INT, `EffectiveLength` DOUBLE, `TPM` DOUBLE, `NumReads` DOUBLE " for i in range( len(self.fileList) ): # # get NumReads from next salmon quant file # quantFile = self.fileList[i] sampleDF = self.spark.read.load( quantFile, format="csv", sep="\t", schema=quantSchema, header="true" ) # did not fix bug .repartition(50) sampleName = self.sampleNamesList[i] sampleDF = sampleDF.select( ["Name", "NumReads"] )\ .withColumnRenamed( "NumReads", sampleName ) sampleDF.createOrReplaceTempView( "sample" ) self.logger.warn("AEDWIP i:{} sampleName:{} sampleDF.num rows:{} num cols:{} num parts:{}" .format(i, sampleName, sampleDF.count(), len(sampleDF.columns), sampleDF.rdd.getNumPartitions())) # # append NumReads to table of reads # # the sample name must be quoted else column names with a '-' # like 1117F-0426-SM-5EGHI will generate an error # spark think the '-' is an expression. '_' is also # a special char for the sql like operator # https://stackoverflow.com/a/63899306/4586180 sqlStmt = '\t\t\t\t\t\tselect rc.*, `{}` \n\ from \n\ retNumReadsDF as rc, \n\ sample \n\ where \n\ rc.Name == sample.Name \n'.format( sampleName ) self.logger.debug( "sqlStmt:\n{}\n".format( sqlStmt ) ) if i == 0 : retNumReadsDF = sampleDF else : retNumReadsDF = self.spark.sql( sqlStmt ) retNumReadsDF.createOrReplaceTempView( "retNumReadsDF" ) # # debug. seems like we do not make progress when we run on training # nothing happens, logs do not change, cluster metrics drop suggesting no work # is being done # add an action to try and debug # this should not change the physical plan. I.e. we still have the same number of shuffles # which results in the same number of stage. We are just not building up a plan with thousands # of stages. # self.logger.warn("AEDWIP i:{} retNumReadsDF.num rows:{} num cols:{} num parts:{}" .format(i, retNumReadsDF.count(), len(retNumReadsDF.columns), retNumReadsDF.rdd.getNumPartitions()) ) # # TODO AEDWIP spark analyze chapter 18 debugging joins # execution plan should be the same for each join #rawCountsSDF.explain() self.logger.info( "END\n" ) return retNumReadsDF From: David Diebold <davidjdieb...@gmail.com> Date: Monday, January 3, 2022 at 12:39 AM To: Andrew Davidson <aedav...@ucsc.edu.invalid>, "user @spark" <user@spark.apache.org> Subject: Re: Pyspark debugging best practices Hello Andy, Are you sure you want to perform lots of join operations, and not simple unions ? Are you doing inner joins or outer joins ? Can you provide us with a rough amount of your list size plus each individual dataset size ? Have a look at execution plan would help, maybe the high amount of join operations makes execution plan too complicated at the end of the day ; checkpointing could help there ? Cheers, David Le jeu. 30 déc. 2021 à 16:56, Andrew Davidson <aedav...@ucsc.edu.invalid> a écrit : Hi Gourav I will give databricks a try. Each data gets loaded into a data frame. I select one column from the data frame I join the column to the accumulated joins from previous data frames in the list To debug. I think am gaining to put an action and log statement after each join. I do not think it will change the performance. I believe the physical plan will be the same how ever hopefully it will shed some light. At the very least I will know if it making progress or not. And hopefully where it is breaking Happy new year Andy On Tue, Dec 28, 2021 at 4:19 AM Gourav Sengupta <gourav.sengu...@gmail.com<mailto:gourav.sengu...@gmail.com>> wrote: Hi Andrew, Any chance you might give Databricks a try in GCP? The above transformations look complicated to me, why are you adding dataframes to a list? Regards, Gourav Sengupta On Sun, Dec 26, 2021 at 7:00 PM Andrew Davidson <aedav...@ucsc.edu.invalid> wrote: Hi I am having trouble debugging my driver. It runs correctly on smaller data set but fails on large ones. It is very hard to figure out what the bug is. I suspect it may have something do with the way spark is installed and configured. I am using google cloud platform dataproc pyspark The log messages are not helpful. The error message will be something like "User application exited with status 1" And jsonPayload: { class: "server.TThreadPoolServer" filename: "hive-server2.log" message: "Error occurred during processing of message." thread: "HiveServer2-Handler-Pool: Thread-40" } I am able to access the spark history server however it does not capture anything if the driver crashes. I am unable to figure out how to access spark web UI. My driver program looks something like the pseudo code bellow. A long list of transforms with a single action, (i.e. write) at the end. Adding log messages is not helpful because of lazy evaluations. I am tempted to add something like Logger.warn( “DEBUG df.count():{}”.format( df.count() )” to try and inline some sort of diagnostic message. What do you think? Is there a better way to debug this? Kind regards Andy def run(): listOfDF = [] for filePath in listOfFiles: df = spark.read.load( filePath, ...) listOfDF.append(df) list2OfDF = [] for df in listOfDF: df2 = df.select( .... ) lsit2OfDF.append( df2 ) # will setting list to None free cache? # or just driver memory listOfDF = None df3 = list2OfDF[0] for i in range( 1, len(list2OfDF) ): df = list2OfDF[i] df3 = df3.join(df ...) # will setting to list to None free cache? # or just driver memory List2OfDF = None lots of narrow transformations on d3 return df3 def main() : df = run() df.write()