Re: Joining many tables Re: Pyspark debugging best practices
Hi Andrew, Do you think the following would work? Build data frames by appending a column source to each (sampleName). Add extra columns as per scheme of quantSchema. Then union. So you get one data frame with many entries per name. You can then use windowing functions over them. On Tue, 4 Jan 2022 at 6:29 AM, Andrew Davidson wrote: > Hi David > > > > I need to select 1 column from many files and combine them into a single > table. > > > > I do not believe union() will work. It appends rows, not columns. > > > > As far as I know join() is the only way to append columns from different > data frames. > > > > I think you correct that using lazy evaluation over a lot of joins may > make the execution plan to complicated. To debug I added > > > > logger.warn( “i:{}, num file rows:{} num joined rows:{}”.format(i, > df.count(), retDF.count() ) > > > > to try and simplify the execution plan. > > > > Once I set spark.sql.autoBroadcastJoinThreshold=-1 my big job started > making some progress how ever fails after a few files. Resources are maxed > out! > > > > I estimated that that the raw data should be < 500 GB. I am running a > cluster with 2.8 TB that should be more than enough to spark over head > > > > Is spark integrated with the python garbage collector? > > > > I assume createOrReplaceTempView() would cause cache to get flushed as > needed? > > > > Kind regards > > > > Andy > > > > > ### > > > def _loadSalmonReadsTable(self): > > ''' > > AEDWIP TODO > > ''' > > self.logger.info( "BEGIN" ) > > retNumReadsDF = None > > quantSchema = "`Name` STRING, `Length` INT, `EffectiveLength` > DOUBLE, `TPM` DOUBLE, `NumReads` DOUBLE " > > for i in range( len(self.fileList) ): > > # > > # get NumReads from next salmon *quant* file > > # > > quantFile = self.fileList[i] > > sampleDF = self.spark.read.load( quantFile, format="*csv*", > sep="\t", > > schema=quantSchema, header="true" ) > > # did not fix bug .repartition(50) > > > > sampleName = self.sampleNamesList[i] > > sampleDF = sampleDF.select( ["Name", "NumReads"] )\ > > .withColumnRenamed( "NumReads", sampleName ) > > > > sampleDF.createOrReplaceTempView( "sample" ) > > > > self.logger.warn("AEDWIP i:{} sampleName:{} sampleDF.num > rows:{} *num* *cols*:{} *num* parts:{}" > > .format(i, sampleName, sampleDF.count(), > len(sampleDF.columns), sampleDF.rdd.getNumPartitions())) > > > > # > > # append NumReads to table of reads > > # > > > > # the sample name must be quoted else column names with a '-' > > # like 1117F-0426-SM-5EGHI will generate an error > > # spark think the '-' is an expression. '_' is also > > # a special char for the *sql* like operator > > # https://stackoverflow.com/a/63899306/4586180 > > sqlStmt = '\t\t\t\t\t\*tselect* *rc*.*, `{}` \n\ > > from \n\ > >retNumReadsDF as *rc*, \n\ > >sample \n\ > > where \n\ > > rc.Name == sample.Name \n'.format( > sampleName ) > > > > self.logger.debug( "sqlStmt:\n{}\n".format( sqlStmt ) ) > > if i == 0 : > > retNumReadsDF = sampleDF > > else : > > retNumReadsDF = self.spark.sql( sqlStmt ) > > > > retNumReadsDF.createOrReplaceTempView( "retNumReadsDF" ) > > > > # > > # debug. seems like we do not make progress when we run on > training > > # nothing happens, logs do not change, cluster metrics drop > suggesting no work > > # is being done > > # add an action to try and debug > > # this should not change the physical plan. I.e. we still > have the same number of shuffles > > # which results in the same number of stage. We are just not > building up a plan with thousands > > # of stages. > > # > > self.logger.warn("AEDWIP i:{} retNumReadsDF.num rows:{} *num* > *cols*:{} *num* parts:{}" > > .format(i, retNumReadsDF.count(), > len(retNumReadsDF.columns), retNumReadsDF.rdd.getNumPartitions()) ) > > > > # > > # TODO AEDWIP spark analyze chapter 18 debugging joins > > > > # execution plan should be the same for each join > > #rawCountsSDF.explain() > > > > self.logger.info( "END\n" ) > > return retNumReadsDF > > > > > > *From: *David Diebold > *Date: *Monday, January 3, 2022 at 12:39 AM > *To: *Andrew D
Joining many tables Re: Pyspark debugging best practices
Hi David I need to select 1 column from many files and combine them into a single table. I do not believe union() will work. It appends rows, not columns. As far as I know join() is the only way to append columns from different data frames. I think you correct that using lazy evaluation over a lot of joins may make the execution plan to complicated. To debug I added logger.warn( “i:{}, num file rows:{} num joined rows:{}”.format(i, df.count(), retDF.count() ) to try and simplify the execution plan. Once I set spark.sql.autoBroadcastJoinThreshold=-1 my big job started making some progress how ever fails after a few files. Resources are maxed out! I estimated that that the raw data should be < 500 GB. I am running a cluster with 2.8 TB that should be more than enough to spark over head Is spark integrated with the python garbage collector? I assume createOrReplaceTempView() would cause cache to get flushed as needed? Kind regards Andy ### def _loadSalmonReadsTable(self): ''' AEDWIP TODO ''' self.logger.info( "BEGIN" ) retNumReadsDF = None quantSchema = "`Name` STRING, `Length` INT, `EffectiveLength` DOUBLE, `TPM` DOUBLE, `NumReads` DOUBLE " for i in range( len(self.fileList) ): # # get NumReads from next salmon quant file # quantFile = self.fileList[i] sampleDF = self.spark.read.load( quantFile, format="csv", sep="\t", schema=quantSchema, header="true" ) # did not fix bug .repartition(50) sampleName = self.sampleNamesList[i] sampleDF = sampleDF.select( ["Name", "NumReads"] )\ .withColumnRenamed( "NumReads", sampleName ) sampleDF.createOrReplaceTempView( "sample" ) self.logger.warn("AEDWIP i:{} sampleName:{} sampleDF.num rows:{} num cols:{} num parts:{}" .format(i, sampleName, sampleDF.count(), len(sampleDF.columns), sampleDF.rdd.getNumPartitions())) # # append NumReads to table of reads # # the sample name must be quoted else column names with a '-' # like 1117F-0426-SM-5EGHI will generate an error # spark think the '-' is an expression. '_' is also # a special char for the sql like operator # https://stackoverflow.com/a/63899306/4586180 sqlStmt = '\t\t\t\t\t\tselect rc.*, `{}` \n\ from \n\ retNumReadsDF as rc, \n\ sample \n\ where \n\ rc.Name == sample.Name \n'.format( sampleName ) self.logger.debug( "sqlStmt:\n{}\n".format( sqlStmt ) ) if i == 0 : retNumReadsDF = sampleDF else : retNumReadsDF = self.spark.sql( sqlStmt ) retNumReadsDF.createOrReplaceTempView( "retNumReadsDF" ) # # debug. seems like we do not make progress when we run on training # nothing happens, logs do not change, cluster metrics drop suggesting no work # is being done # add an action to try and debug # this should not change the physical plan. I.e. we still have the same number of shuffles # which results in the same number of stage. We are just not building up a plan with thousands # of stages. # self.logger.warn("AEDWIP i:{} retNumReadsDF.num rows:{} num cols:{} num parts:{}" .format(i, retNumReadsDF.count(), len(retNumReadsDF.columns), retNumReadsDF.rdd.getNumPartitions()) ) # # TODO AEDWIP spark analyze chapter 18 debugging joins # execution plan should be the same for each join #rawCountsSDF.explain() self.logger.info( "END\n" ) return retNumReadsDF From: David Diebold Date: Monday, January 3, 2022 at 12:39 AM To: Andrew Davidson , "user @spark" Subject: Re: Pyspark debugging best practices Hello Andy, Are you sure you want to perform lots of join operations, and not simple unions ? Are you doing inner joins or outer joins ? Can you provide us with a rough amount of your list size plus each individual dataset size ? Have a look at execution plan would help, maybe the high amount of join operations makes execution plan too complicated at the end of the day ; checkpointing could help there ? Cheers, David Le jeu. 30 déc. 2021 à 16:56, Andrew Davidson a écrit : Hi Gourav I will give databricks a try. Each data gets loaded into a data frame. I select one column from the data frame I join the column to the accumulated joins from previous data frames in the