Re: OOM Joining thousands of dataframes Was: AnalysisException: Trouble using select() to append multiple columns
Hi, it is the same thing when you are using Sql or dataframe api. actually, they will be optimized by catalyst then push to rdd. and in this case, there are many times on iteration, (16000 times). so you got a very big execution plan when you join the dataframe again and again, I think this is the reason you got the IOM and analysis exception. my suggestion is you need checkpoint the dataframe when joined 200 dataframes. so you can trancate the lineage. so the optimizer only analysis the 200 dataframe. this will reduce the pressure of spark engine. | | Hollis | Replied mail | From | Gourav Sengupta | | Date | 12/25/2021 03:46 | | To | Sean Owen | | Cc | Andrew Davidson、Nicholas Gustafson、User | | Subject | Re: OOM Joining thousands of dataframes Was: AnalysisException: Trouble using select() to append multiple columns | Hi, may be I am getting confused as always :) , but the requirement looked pretty simple to me to be implemented in SQL, or it is just the euphoria of Christmas eve Anyways, in case the above can be implemented in SQL, then I can have a look at it. Yes, indeed there are bespoke scenarios where dataframes may apply and RDD are used, but for UDF's I prefer SQL as well, but that may be a personal idiosyncrasy. The Oreilly book on data algorithms using SPARK, pyspark uses dataframes and RDD API's :) Regards, Gourav Sengupta On Fri, Dec 24, 2021 at 6:11 PM Sean Owen wrote: This is simply not generally true, no, and not in this case. The programmatic and SQL APIs overlap a lot, and where they do, they're essentially aliases. Use whatever is more natural. What I wouldn't recommend doing is emulating SQL-like behavior in custom code, UDFs, etc. The native operators will be faster. Sometimes you have to go outside SQL where necessary, like in UDFs or complex aggregation logic. Then you can't use SQL. On Fri, Dec 24, 2021 at 12:05 PM Gourav Sengupta wrote: Hi, yeah I think that in practice you will always find that dataframes can give issues regarding a lot of things, and then you can argue. In the SPARK conference, I think last year, it was shown that more than 92% or 95% use the SPARK SQL API, if I am not mistaken. I think that you can do the entire processing at one single go. Can you please write down the end to end SQL and share without the 16000 iterations? Regards, Gourav Sengupta On Fri, Dec 24, 2021 at 5:16 PM Andrew Davidson wrote: Hi Sean and Gourav Thanks for the suggestions. I thought that both the sql and dataframe apis are wrappers around the same frame work? Ie. catalysts. I tend to mix and match my code. Sometimes I find it easier to write using sql some times dataframes. What is considered best practices? Here is an example
Re: OOM Joining thousands of dataframes Was: AnalysisException: Trouble using select() to append multiple columns
Hi, may be I am getting confused as always :) , but the requirement looked pretty simple to me to be implemented in SQL, or it is just the euphoria of Christmas eve Anyways, in case the above can be implemented in SQL, then I can have a look at it. Yes, indeed there are bespoke scenarios where dataframes may apply and RDD are used, but for UDF's I prefer SQL as well, but that may be a personal idiosyncrasy. The Oreilly book on data algorithms using SPARK, pyspark uses dataframes and RDD API's :) Regards, Gourav Sengupta On Fri, Dec 24, 2021 at 6:11 PM Sean Owen wrote: > This is simply not generally true, no, and not in this case. The > programmatic and SQL APIs overlap a lot, and where they do, they're > essentially aliases. Use whatever is more natural. > What I wouldn't recommend doing is emulating SQL-like behavior in custom > code, UDFs, etc. The native operators will be faster. > Sometimes you have to go outside SQL where necessary, like in UDFs or > complex aggregation logic. Then you can't use SQL. > > On Fri, Dec 24, 2021 at 12:05 PM Gourav Sengupta < > gourav.sengu...@gmail.com> wrote: > >> Hi, >> >> yeah I think that in practice you will always find that dataframes can >> give issues regarding a lot of things, and then you can argue. In the SPARK >> conference, I think last year, it was shown that more than 92% or 95% use >> the SPARK SQL API, if I am not mistaken. >> >> I think that you can do the entire processing at one single go. >> >> Can you please write down the end to end SQL and share without the 16000 >> iterations? >> >> >> Regards, >> Gourav Sengupta >> >> >> On Fri, Dec 24, 2021 at 5:16 PM Andrew Davidson >> wrote: >> >>> Hi Sean and Gourav >>> >>> >>> >>> Thanks for the suggestions. I thought that both the sql and dataframe >>> apis are wrappers around the same frame work? Ie. catalysts. >>> >>> >>> >>> I tend to mix and match my code. Sometimes I find it easier to write >>> using sql some times dataframes. What is considered best practices? >>> >>> >>> >>> Here is an example >>> >>> >>> >>>
Re: OOM Joining thousands of dataframes Was: AnalysisException: Trouble using select() to append multiple columns
This is simply not generally true, no, and not in this case. The programmatic and SQL APIs overlap a lot, and where they do, they're essentially aliases. Use whatever is more natural. What I wouldn't recommend doing is emulating SQL-like behavior in custom code, UDFs, etc. The native operators will be faster. Sometimes you have to go outside SQL where necessary, like in UDFs or complex aggregation logic. Then you can't use SQL. On Fri, Dec 24, 2021 at 12:05 PM Gourav Sengupta wrote: > Hi, > > yeah I think that in practice you will always find that dataframes can > give issues regarding a lot of things, and then you can argue. In the SPARK > conference, I think last year, it was shown that more than 92% or 95% use > the SPARK SQL API, if I am not mistaken. > > I think that you can do the entire processing at one single go. > > Can you please write down the end to end SQL and share without the 16000 > iterations? > > > Regards, > Gourav Sengupta > > > On Fri, Dec 24, 2021 at 5:16 PM Andrew Davidson wrote: > >> Hi Sean and Gourav >> >> >> >> Thanks for the suggestions. I thought that both the sql and dataframe >> apis are wrappers around the same frame work? Ie. catalysts. >> >> >> >> I tend to mix and match my code. Sometimes I find it easier to write >> using sql some times dataframes. What is considered best practices? >> >> >> >> Here is an example >> >> >> >>
Re: OOM Joining thousands of dataframes Was: AnalysisException: Trouble using select() to append multiple columns
Hi, yeah I think that in practice you will always find that dataframes can give issues regarding a lot of things, and then you can argue. In the SPARK conference, I think last year, it was shown that more than 92% or 95% use the SPARK SQL API, if I am not mistaken. I think that you can do the entire processing at one single go. Can you please write down the end to end SQL and share without the 16000 iterations? Regards, Gourav Sengupta On Fri, Dec 24, 2021 at 5:16 PM Andrew Davidson wrote: > Hi Sean and Gourav > > > > Thanks for the suggestions. I thought that both the sql and dataframe apis > are wrappers around the same frame work? Ie. catalysts. > > > > I tend to mix and match my code. Sometimes I find it easier to write using > sql some times dataframes. What is considered best practices? > > > > Here is an example > > > > Case 1 > >for i in range( 1, len( self.sampleNamesList ) ): # iterate 16000 > times! > > sampleName = self.sampleNamesList[i] > > sampleDF= quantSparkDFList[i] > >sampleSDF.createOrReplaceTempView( "sample" ) > > > > # the sample name must be quoted else column names with a '-' > > # like GTEX-1117F-0426-SM-5EGHI will generate an error > > # spark thinks the '-' is an expression. '_' is also > > # a special char for the sql like operator > > # https://stackoverflow.com/a/63899306/4586180 > > sqlStmt = '\t\t\t\t\t\tselect rc.*, `{}` \n\ > > from \n\ > >rawCounts as rc, \n\ > >sample \n\ > > where \n\ > > rc.Name == sample.Name \n'.format( > sampleName ) > > > > rawCountsSDF = self.spark.sql( sqlStmt ) > > rawCountsSDF.createOrReplaceTempView( "rawCounts" > > > > case 2 > >for i in range( 1, len(dfList) ): > > df2 = dfList[i] > > retDF = retDF.join( df2.selectExpr("*"), > on=["Name"] ) > > > > > > I think my out of memory exception maybe because the query plan is huge. I > have not figure out how to figure out if that is my bug or not. My untested > work around is organize the data so that each massive join is run on 1/5 of > the total data set, then union them all together. Each “part” will still > need to iterate 16000 times > > > > In general I assume we want to avoid for loops. I assume Spark is unable > to optimize them. It would be nice if spark provide some sort of join all > function even if it used a for loop to hide this from me > > > > Happy holidays > > > > Andy > > > > > > > > *From: *Sean Owen > *Date: *Friday, December 24, 2021 at 8:30 AM > *To: *Gourav Sengupta > *Cc: *Andrew Davidson , Nicholas Gustafson < > njgustaf...@gmail.com>, User > *Subject: *Re: AnalysisException: Trouble using select() to append > multiple columns > > > > (that's not the situation below we are commenting on) > > On Fri, Dec 24, 2021, 9:28 AM Gourav Sengupta > wrote: > > Hi, > > > > try to write several withColumns in a dataframe with functions and then > see the UI for time differences. This should be done with large data sets > of course, in order of a around 200GB + > > > > With scenarios involving nested queries and joins the time differences > shown in the UI becomes a bit more visible. > > > > Regards, > > Gourav Sengupta > > > > On Fri, Dec 24, 2021 at 2:48 PM Sean Owen wrote: > > Nah, it's going to translate to the same plan as the equivalent SQL. > > > > On Fri, Dec 24, 2021, 5:09 AM Gourav Sengupta > wrote: > > Hi, > > > > please note that using SQL is much more performant, and easier to manage > these kind of issues. You might want to look at the SPARK UI to see the > advantage of using SQL over dataframes API. > > > > > > Regards, > > Gourav Sengupta > > > > On Sat, Dec 18, 2021 at 5:40 PM Andrew Davidson > wrote: > > Thanks Nicholas > > > > Andy > > > > *From: *Nicholas Gustafson > *Date: *Friday, December 17, 2021 at 6:12 PM > *To: *Andrew Davidson > *Cc: *"user@spark.apache.org" > *Subject: *Re: AnalysisException: Trouble using select() to append > multiple columns > > > > Since df1 and df2 are different DataFrames, you will need to use a join. > For examp
OOM Joining thousands of dataframes Was: AnalysisException: Trouble using select() to append multiple columns
Hi Sean and Gourav Thanks for the suggestions. I thought that both the sql and dataframe apis are wrappers around the same frame work? Ie. catalysts. I tend to mix and match my code. Sometimes I find it easier to write using sql some times dataframes. What is considered best practices? Here is an example Case 1 for i in range( 1, len( self.sampleNamesList ) ): # iterate 16000 times! sampleName = self.sampleNamesList[i] sampleDF= quantSparkDFList[i] sampleSDF.createOrReplaceTempView( "sample" ) # the sample name must be quoted else column names with a '-' # like GTEX-1117F-0426-SM-5EGHI will generate an error # spark thinks the '-' is an expression. '_' is also # a special char for the sql like operator # https://stackoverflow.com/a/63899306/4586180 sqlStmt = '\t\t\t\t\t\tselect rc.*, `{}` \n\ from \n\ rawCounts as rc, \n\ sample \n\ where \n\ rc.Name == sample.Name \n'.format( sampleName ) rawCountsSDF = self.spark.sql( sqlStmt ) rawCountsSDF.createOrReplaceTempView( "rawCounts" case 2 for i in range( 1, len(dfList) ): df2 = dfList[i] retDF = retDF.join( df2.selectExpr("*"), on=["Name"] ) I think my out of memory exception maybe because the query plan is huge. I have not figure out how to figure out if that is my bug or not. My untested work around is organize the data so that each massive join is run on 1/5 of the total data set, then union them all together. Each “part” will still need to iterate 16000 times In general I assume we want to avoid for loops. I assume Spark is unable to optimize them. It would be nice if spark provide some sort of join all function even if it used a for loop to hide this from me Happy holidays Andy From: Sean Owen Date: Friday, December 24, 2021 at 8:30 AM To: Gourav Sengupta Cc: Andrew Davidson , Nicholas Gustafson , User Subject: Re: AnalysisException: Trouble using select() to append multiple columns (that's not the situation below we are commenting on) On Fri, Dec 24, 2021, 9:28 AM Gourav Sengupta mailto:gourav.sengu...@gmail.com>> wrote: Hi, try to write several withColumns in a dataframe with functions and then see the UI for time differences. This should be done with large data sets of course, in order of a around 200GB + With scenarios involving nested queries and joins the time differences shown in the UI becomes a bit more visible. Regards, Gourav Sengupta On Fri, Dec 24, 2021 at 2:48 PM Sean Owen mailto:sro...@gmail.com>> wrote: Nah, it's going to translate to the same plan as the equivalent SQL. On Fri, Dec 24, 2021, 5:09 AM Gourav Sengupta mailto:gourav.sengu...@gmail.com>> wrote: Hi, please note that using SQL is much more performant, and easier to manage these kind of issues. You might want to look at the SPARK UI to see the advantage of using SQL over dataframes API. Regards, Gourav Sengupta On Sat, Dec 18, 2021 at 5:40 PM Andrew Davidson wrote: Thanks Nicholas Andy From: Nicholas Gustafson mailto:njgustaf...@gmail.com>> Date: Friday, December 17, 2021 at 6:12 PM To: Andrew Davidson Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" mailto:user@spark.apache.org>> Subject: Re: AnalysisException: Trouble using select() to append multiple columns Since df1 and df2 are different DataFrames, you will need to use a join. For example: df1.join(df2.selectExpr(“Name”, “NumReads as ctrl_2”), on=[“Name”]) On Dec 17, 2021, at 16:25, Andrew Davidson wrote: Hi I am a newbie I have 16,000 data files, all files have the same number of rows and columns. The row ids are identical and are in the same order. I want to create a new data frame that contains the 3rd column from each data file I wrote a test program that uses a for loop and Join. It works with my small test set. I get an OOM when I try to run using the all the data files. I realize that join ( map reduce) is probably not a great solution for my problem Recently I found several articles that take about the challenge with using withColumn() and talk about how to use select() to append columns https://mungingdata.com/pyspark/select-add-columns-withcolumn/ https://stackoverflow.com/questions/64627112/adding-multiple-columns-in-pyspark-dataframe-using-a-loop I am using pyspark spark-3.1.2-bin-hadoop3.2 I wrote a little test program. It am able to append columns created using pyspark.sql.function.lit(). I am not able to append columns from other data frames df1 DataFrame[Name: string, ctrl_1: double] +---+--+ | Name|ctrl_1| +---+--+
Re: AnalysisException: Trouble using select() to append multiple columns
(that's not the situation below we are commenting on) On Fri, Dec 24, 2021, 9:28 AM Gourav Sengupta wrote: > Hi, > > try to write several withColumns in a dataframe with functions and then > see the UI for time differences. This should be done with large data sets > of course, in order of a around 200GB + > > With scenarios involving nested queries and joins the time differences > shown in the UI becomes a bit more visible. > > Regards, > Gourav Sengupta > > On Fri, Dec 24, 2021 at 2:48 PM Sean Owen wrote: > >> Nah, it's going to translate to the same plan as the equivalent SQL. >> >> On Fri, Dec 24, 2021, 5:09 AM Gourav Sengupta >> wrote: >> >>> Hi, >>> >>> please note that using SQL is much more performant, and easier to manage >>> these kind of issues. You might want to look at the SPARK UI to see the >>> advantage of using SQL over dataframes API. >>> >>> >>> Regards, >>> Gourav Sengupta >>> >>> On Sat, Dec 18, 2021 at 5:40 PM Andrew Davidson >>> wrote: >>> >>>> Thanks Nicholas >>>> >>>> >>>> >>>> Andy >>>> >>>> >>>> >>>> *From: *Nicholas Gustafson >>>> *Date: *Friday, December 17, 2021 at 6:12 PM >>>> *To: *Andrew Davidson >>>> *Cc: *"user@spark.apache.org" >>>> *Subject: *Re: AnalysisException: Trouble using select() to append >>>> multiple columns >>>> >>>> >>>> >>>> Since df1 and df2 are different DataFrames, you will need to use a >>>> join. For example: df1.join(df2.selectExpr(“Name”, “NumReads as ctrl_2”), >>>> on=[“Name”]) >>>> >>>> >>>> >>>> On Dec 17, 2021, at 16:25, Andrew Davidson >>>> wrote: >>>> >>>> >>>> >>>> Hi I am a newbie >>>> >>>> >>>> >>>> I have 16,000 data files, all files have the same number of rows and >>>> columns. The row ids are identical and are in the same order. I want to >>>> create a new data frame that contains the 3rd column from each data >>>> file >>>> >>>> >>>> >>>> I wrote a test program that uses a for loop and Join. It works with my >>>> small test set. I get an OOM when I try to run using the all the data >>>> files. I realize that join ( map reduce) is probably not a great solution >>>> for my problem >>>> >>>> >>>> >>>> Recently I found several articles that take about the challenge with >>>> using withColumn() and talk about how to use select() to append columns >>>> >>>> >>>> >>>> https://mungingdata.com/pyspark/select-add-columns-withcolumn/ >>>> >>>> >>>> https://stackoverflow.com/questions/64627112/adding-multiple-columns-in-pyspark-dataframe-using-a-loop >>>> >>>> >>>> >>>> I am using pyspark spark-3.1.2-bin-hadoop3.2 >>>> >>>> >>>> >>>> I wrote a little test program. It am able to append columns created >>>> using pyspark.sql.function.lit(). I am not able to append columns from >>>> other data frames >>>> >>>> >>>> >>>> df1 >>>> >>>> DataFrame[Name: string, ctrl_1: double] >>>> >>>> +---+--+ >>>> >>>> | Name|ctrl_1| >>>> >>>> +---+--+ >>>> >>>> | txId_1| 0.0| >>>> >>>> | txId_2| 11.0| >>>> >>>> | txId_3| 12.0| >>>> >>>> | txId_4| 13.0| >>>> >>>> | txId_5| 14.0| >>>> >>>> | txId_6| 15.0| >>>> >>>> | txId_7| 16.0| >>>> >>>> | txId_8| 17.0| >>>> >>>> | txId_9| 18.0| >>>> >>>> |txId_10| 19.0| >>>> >>>> +---+--+ >>>> >>>> >>>> >>>> # use select to append multiple literals >>>> >>>> allDF3 = df1.select( ["*", pyf.lit("abc").alias("x"), >>>> pyf.lit("mn0").alias("y")] ) >>>> >>>> >>>> >>>> allDF3 >>>> >>>> DataFrame[Name: string, ctrl_
Re: AnalysisException: Trouble using select() to append multiple columns
Hi, try to write several withColumns in a dataframe with functions and then see the UI for time differences. This should be done with large data sets of course, in order of a around 200GB + With scenarios involving nested queries and joins the time differences shown in the UI becomes a bit more visible. Regards, Gourav Sengupta On Fri, Dec 24, 2021 at 2:48 PM Sean Owen wrote: > Nah, it's going to translate to the same plan as the equivalent SQL. > > On Fri, Dec 24, 2021, 5:09 AM Gourav Sengupta > wrote: > >> Hi, >> >> please note that using SQL is much more performant, and easier to manage >> these kind of issues. You might want to look at the SPARK UI to see the >> advantage of using SQL over dataframes API. >> >> >> Regards, >> Gourav Sengupta >> >> On Sat, Dec 18, 2021 at 5:40 PM Andrew Davidson >> wrote: >> >>> Thanks Nicholas >>> >>> >>> >>> Andy >>> >>> >>> >>> *From: *Nicholas Gustafson >>> *Date: *Friday, December 17, 2021 at 6:12 PM >>> *To: *Andrew Davidson >>> *Cc: *"user@spark.apache.org" >>> *Subject: *Re: AnalysisException: Trouble using select() to append >>> multiple columns >>> >>> >>> >>> Since df1 and df2 are different DataFrames, you will need to use a join. >>> For example: df1.join(df2.selectExpr(“Name”, “NumReads as ctrl_2”), >>> on=[“Name”]) >>> >>> >>> >>> On Dec 17, 2021, at 16:25, Andrew Davidson >>> wrote: >>> >>> >>> >>> Hi I am a newbie >>> >>> >>> >>> I have 16,000 data files, all files have the same number of rows and >>> columns. The row ids are identical and are in the same order. I want to >>> create a new data frame that contains the 3rd column from each data file >>> >>> >>> >>> I wrote a test program that uses a for loop and Join. It works with my >>> small test set. I get an OOM when I try to run using the all the data >>> files. I realize that join ( map reduce) is probably not a great solution >>> for my problem >>> >>> >>> >>> Recently I found several articles that take about the challenge with >>> using withColumn() and talk about how to use select() to append columns >>> >>> >>> >>> https://mungingdata.com/pyspark/select-add-columns-withcolumn/ >>> >>> >>> https://stackoverflow.com/questions/64627112/adding-multiple-columns-in-pyspark-dataframe-using-a-loop >>> >>> >>> >>> I am using pyspark spark-3.1.2-bin-hadoop3.2 >>> >>> >>> >>> I wrote a little test program. It am able to append columns created >>> using pyspark.sql.function.lit(). I am not able to append columns from >>> other data frames >>> >>> >>> >>> df1 >>> >>> DataFrame[Name: string, ctrl_1: double] >>> >>> +---+--+ >>> >>> | Name|ctrl_1| >>> >>> +---+--+ >>> >>> | txId_1| 0.0| >>> >>> | txId_2| 11.0| >>> >>> | txId_3| 12.0| >>> >>> | txId_4| 13.0| >>> >>> | txId_5| 14.0| >>> >>> | txId_6| 15.0| >>> >>> | txId_7| 16.0| >>> >>> | txId_8| 17.0| >>> >>> | txId_9| 18.0| >>> >>> |txId_10| 19.0| >>> >>> +---+--+ >>> >>> >>> >>> # use select to append multiple literals >>> >>> allDF3 = df1.select( ["*", pyf.lit("abc").alias("x"), >>> pyf.lit("mn0").alias("y")] ) >>> >>> >>> >>> allDF3 >>> >>> DataFrame[Name: string, ctrl_1: double, x: string, y: string] >>> >>> +---+--+---+---+ >>> >>> | Name|ctrl_1| x| y| >>> >>> +---+--+---+---+ >>> >>> | txId_1| 0.0|abc|mn0| >>> >>> | txId_2| 11.0|abc|mn0| >>> >>> | txId_3| 12.0|abc|mn0| >>> >>> | txId_4| 13.0|abc|mn0| >>> >>> | txId_5| 14.0|abc|mn0| >>> >>> | txId_6| 15.0|abc|mn0| >>> >>> | txId_7| 16.0|abc|mn0| >>> >>> | txId_8| 17.0|abc|mn0| >>> >>> | txId_9| 18.0|abc|mn0| >>> >>> |txId_10| 19.0|abc|mn0| >>> >
Re: AnalysisException: Trouble using select() to append multiple columns
Nah, it's going to translate to the same plan as the equivalent SQL. On Fri, Dec 24, 2021, 5:09 AM Gourav Sengupta wrote: > Hi, > > please note that using SQL is much more performant, and easier to manage > these kind of issues. You might want to look at the SPARK UI to see the > advantage of using SQL over dataframes API. > > > Regards, > Gourav Sengupta > > On Sat, Dec 18, 2021 at 5:40 PM Andrew Davidson > wrote: > >> Thanks Nicholas >> >> >> >> Andy >> >> >> >> *From: *Nicholas Gustafson >> *Date: *Friday, December 17, 2021 at 6:12 PM >> *To: *Andrew Davidson >> *Cc: *"user@spark.apache.org" >> *Subject: *Re: AnalysisException: Trouble using select() to append >> multiple columns >> >> >> >> Since df1 and df2 are different DataFrames, you will need to use a join. >> For example: df1.join(df2.selectExpr(“Name”, “NumReads as ctrl_2”), >> on=[“Name”]) >> >> >> >> On Dec 17, 2021, at 16:25, Andrew Davidson >> wrote: >> >> >> >> Hi I am a newbie >> >> >> >> I have 16,000 data files, all files have the same number of rows and >> columns. The row ids are identical and are in the same order. I want to >> create a new data frame that contains the 3rd column from each data file >> >> >> >> I wrote a test program that uses a for loop and Join. It works with my >> small test set. I get an OOM when I try to run using the all the data >> files. I realize that join ( map reduce) is probably not a great solution >> for my problem >> >> >> >> Recently I found several articles that take about the challenge with >> using withColumn() and talk about how to use select() to append columns >> >> >> >> https://mungingdata.com/pyspark/select-add-columns-withcolumn/ >> >> >> https://stackoverflow.com/questions/64627112/adding-multiple-columns-in-pyspark-dataframe-using-a-loop >> >> >> >> I am using pyspark spark-3.1.2-bin-hadoop3.2 >> >> >> >> I wrote a little test program. It am able to append columns created using >> pyspark.sql.function.lit(). I am not able to append columns from other data >> frames >> >> >> >> df1 >> >> DataFrame[Name: string, ctrl_1: double] >> >> +---+--+ >> >> | Name|ctrl_1| >> >> +---+--+ >> >> | txId_1| 0.0| >> >> | txId_2| 11.0| >> >> | txId_3| 12.0| >> >> | txId_4| 13.0| >> >> | txId_5| 14.0| >> >> | txId_6| 15.0| >> >> | txId_7| 16.0| >> >> | txId_8| 17.0| >> >> | txId_9| 18.0| >> >> |txId_10| 19.0| >> >> +---+--+ >> >> >> >> # use select to append multiple literals >> >> allDF3 = df1.select( ["*", pyf.lit("abc").alias("x"), >> pyf.lit("mn0").alias("y")] ) >> >> >> >> allDF3 >> >> DataFrame[Name: string, ctrl_1: double, x: string, y: string] >> >> +---+--+---+---+ >> >> | Name|ctrl_1| x| y| >> >> +---+--+---+---+ >> >> | txId_1| 0.0|abc|mn0| >> >> | txId_2| 11.0|abc|mn0| >> >> | txId_3| 12.0|abc|mn0| >> >> | txId_4| 13.0|abc|mn0| >> >> | txId_5| 14.0|abc|mn0| >> >> | txId_6| 15.0|abc|mn0| >> >> | txId_7| 16.0|abc|mn0| >> >> | txId_8| 17.0|abc|mn0| >> >> | txId_9| 18.0|abc|mn0| >> >> |txId_10| 19.0|abc|mn0| >> >> +---+--+---+---+ >> >> >> >> df2 >> >> DataFrame[Name: string, Length: int, EffectiveLength: double, TPM: double, >> NumReads: double] >> >> +---+--+---+++ >> >> | Name|Length|EffectiveLength| TPM|NumReads| >> >> +---+--+---+++ >> >> | txId_1| 1500| 1234.5|12.1| 0.1| >> >> | txId_2| 1510| 1244.5|13.1|11.1| >> >> | txId_3| 1520| 1254.5|14.1|12.1| >> >> | txId_4| 1530| 1264.5|15.1|13.1| >> >> | txId_5| 1540| 1274.5|16.1|14.1| >> >> | txId_6| 1550| 1284.5|17.1|15.1| >> >> | txId_7| 1560| 1294.5|18.1|16.1| >> >> | txId_8| 1570| 1304.5|19.1|17.1| >> >> | txId_9| 1580| 1314.5|20.1|18.1| >> >> |txId_10| 1590|
Re: AnalysisException: Trouble using select() to append multiple columns
Hi, please note that using SQL is much more performant, and easier to manage these kind of issues. You might want to look at the SPARK UI to see the advantage of using SQL over dataframes API. Regards, Gourav Sengupta On Sat, Dec 18, 2021 at 5:40 PM Andrew Davidson wrote: > Thanks Nicholas > > > > Andy > > > > *From: *Nicholas Gustafson > *Date: *Friday, December 17, 2021 at 6:12 PM > *To: *Andrew Davidson > *Cc: *"user@spark.apache.org" > *Subject: *Re: AnalysisException: Trouble using select() to append > multiple columns > > > > Since df1 and df2 are different DataFrames, you will need to use a join. > For example: df1.join(df2.selectExpr(“Name”, “NumReads as ctrl_2”), > on=[“Name”]) > > > > On Dec 17, 2021, at 16:25, Andrew Davidson > wrote: > > > > Hi I am a newbie > > > > I have 16,000 data files, all files have the same number of rows and > columns. The row ids are identical and are in the same order. I want to > create a new data frame that contains the 3rd column from each data file > > > > I wrote a test program that uses a for loop and Join. It works with my > small test set. I get an OOM when I try to run using the all the data > files. I realize that join ( map reduce) is probably not a great solution > for my problem > > > > Recently I found several articles that take about the challenge with using > withColumn() and talk about how to use select() to append columns > > > > https://mungingdata.com/pyspark/select-add-columns-withcolumn/ > > > https://stackoverflow.com/questions/64627112/adding-multiple-columns-in-pyspark-dataframe-using-a-loop > > > > I am using pyspark spark-3.1.2-bin-hadoop3.2 > > > > I wrote a little test program. It am able to append columns created using > pyspark.sql.function.lit(). I am not able to append columns from other data > frames > > > > df1 > > DataFrame[Name: string, ctrl_1: double] > > +---+--+ > > | Name|ctrl_1| > > +---+--+ > > | txId_1| 0.0| > > | txId_2| 11.0| > > | txId_3| 12.0| > > | txId_4| 13.0| > > | txId_5| 14.0| > > | txId_6| 15.0| > > | txId_7| 16.0| > > | txId_8| 17.0| > > | txId_9| 18.0| > > |txId_10| 19.0| > > +---+--+ > > > > # use select to append multiple literals > > allDF3 = df1.select( ["*", pyf.lit("abc").alias("x"), > pyf.lit("mn0").alias("y")] ) > > > > allDF3 > > DataFrame[Name: string, ctrl_1: double, x: string, y: string] > > +---+--+---+---+ > > | Name|ctrl_1| x| y| > > +---+--+---+---+ > > | txId_1| 0.0|abc|mn0| > > | txId_2| 11.0|abc|mn0| > > | txId_3| 12.0|abc|mn0| > > | txId_4| 13.0|abc|mn0| > > | txId_5| 14.0|abc|mn0| > > | txId_6| 15.0|abc|mn0| > > | txId_7| 16.0|abc|mn0| > > | txId_8| 17.0|abc|mn0| > > | txId_9| 18.0|abc|mn0| > > |txId_10| 19.0|abc|mn0| > > +---+--+---+---+ > > > > df2 > > DataFrame[Name: string, Length: int, EffectiveLength: double, TPM: double, > NumReads: double] > > +---+--+---+++ > > | Name|Length|EffectiveLength| TPM|NumReads| > > +---+--+---+++ > > | txId_1| 1500| 1234.5|12.1| 0.1| > > | txId_2| 1510| 1244.5|13.1|11.1| > > | txId_3| 1520| 1254.5|14.1|12.1| > > | txId_4| 1530| 1264.5|15.1|13.1| > > | txId_5| 1540| 1274.5|16.1|14.1| > > | txId_6| 1550| 1284.5|17.1|15.1| > > | txId_7| 1560| 1294.5|18.1|16.1| > > | txId_8| 1570| 1304.5|19.1|17.1| > > | txId_9| 1580| 1314.5|20.1|18.1| > > |txId_10| 1590| 1324.5|21.1|19.1| > > +---+--+---+++ > > > > s2Col = df2["NumReads"].alias( 'ctrl_2' ) > > print("type(s2Col) = {}".format(type(s2Col)) ) > > > > type(s2Col) = > > > > allDF4 = df1.select( ["*", s2Col] ) > > ~/extraCellularRNA/sparkBin/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/dataframe.py > in select(self, *cols) > > * 1667* [Row(name='Alice', age=12), Row(name='Bob', age=15)] > > * 1668* """ > > -> 1669 jdf = self._jdf.select(self._jcols(*cols)) > > * 1670* return DataFrame(jdf, self.sql_ctx) > > * 1671* > > > > ../../sparkBin/spark-3.1.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py >
Re: AnalysisException: Trouble using select() to append multiple columns
Thanks Nicholas Andy From: Nicholas Gustafson Date: Friday, December 17, 2021 at 6:12 PM To: Andrew Davidson Cc: "user@spark.apache.org" Subject: Re: AnalysisException: Trouble using select() to append multiple columns Since df1 and df2 are different DataFrames, you will need to use a join. For example: df1.join(df2.selectExpr(“Name”, “NumReads as ctrl_2”), on=[“Name”]) On Dec 17, 2021, at 16:25, Andrew Davidson wrote: Hi I am a newbie I have 16,000 data files, all files have the same number of rows and columns. The row ids are identical and are in the same order. I want to create a new data frame that contains the 3rd column from each data file I wrote a test program that uses a for loop and Join. It works with my small test set. I get an OOM when I try to run using the all the data files. I realize that join ( map reduce) is probably not a great solution for my problem Recently I found several articles that take about the challenge with using withColumn() and talk about how to use select() to append columns https://mungingdata.com/pyspark/select-add-columns-withcolumn/ https://stackoverflow.com/questions/64627112/adding-multiple-columns-in-pyspark-dataframe-using-a-loop I am using pyspark spark-3.1.2-bin-hadoop3.2 I wrote a little test program. It am able to append columns created using pyspark.sql.function.lit(). I am not able to append columns from other data frames df1 DataFrame[Name: string, ctrl_1: double] +---+--+ | Name|ctrl_1| +---+--+ | txId_1| 0.0| | txId_2| 11.0| | txId_3| 12.0| | txId_4| 13.0| | txId_5| 14.0| | txId_6| 15.0| | txId_7| 16.0| | txId_8| 17.0| | txId_9| 18.0| |txId_10| 19.0| +---+--+ # use select to append multiple literals allDF3 = df1.select( ["*", pyf.lit("abc").alias("x"), pyf.lit("mn0").alias("y")] ) allDF3 DataFrame[Name: string, ctrl_1: double, x: string, y: string] +---+--+---+---+ | Name|ctrl_1| x| y| +---+--+---+---+ | txId_1| 0.0|abc|mn0| | txId_2| 11.0|abc|mn0| | txId_3| 12.0|abc|mn0| | txId_4| 13.0|abc|mn0| | txId_5| 14.0|abc|mn0| | txId_6| 15.0|abc|mn0| | txId_7| 16.0|abc|mn0| | txId_8| 17.0|abc|mn0| | txId_9| 18.0|abc|mn0| |txId_10| 19.0|abc|mn0| +---+--+---+---+ df2 DataFrame[Name: string, Length: int, EffectiveLength: double, TPM: double, NumReads: double] +---+--+---+++ | Name|Length|EffectiveLength| TPM|NumReads| +---+--+---+++ | txId_1| 1500| 1234.5|12.1| 0.1| | txId_2| 1510| 1244.5|13.1|11.1| | txId_3| 1520| 1254.5|14.1|12.1| | txId_4| 1530| 1264.5|15.1|13.1| | txId_5| 1540| 1274.5|16.1|14.1| | txId_6| 1550| 1284.5|17.1|15.1| | txId_7| 1560| 1294.5|18.1|16.1| | txId_8| 1570| 1304.5|19.1|17.1| | txId_9| 1580| 1314.5|20.1|18.1| |txId_10| 1590| 1324.5|21.1|19.1| +---+--+---+++ s2Col = df2["NumReads"].alias( 'ctrl_2' ) print("type(s2Col) = {}".format(type(s2Col)) ) type(s2Col) = allDF4 = df1.select( ["*", s2Col] ) ~/extraCellularRNA/sparkBin/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/dataframe.py in select(self, *cols) 1667 [Row(name='Alice', age=12), Row(name='Bob', age=15)] 1668 """ -> 1669 jdf = self._jdf.select(self._jcols(*cols)) 1670 return DataFrame(jdf, self.sql_ctx) 1671 ../../sparkBin/spark-3.1.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args) 1303 answer = self.gateway_client.send_command(command) 1304 return_value = get_return_value( -> 1305 answer, self.gateway_client, self.target_id, self.name) 1306 1307 for temp_arg in temp_args: ~/extraCellularRNA/sparkBin/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/utils.py in deco(*a, **kw) 115 # Hide where the exception came from that shows a non-Pythonic 116 # JVM exception message. --> 117 raise converted from None 118 else: 119 raise AnalysisException: Resolved attribute(s) NumReads#14 missing from Name#0,ctrl_1#2447 in operator !Project [Name#0, ctrl_1#2447, NumReads#14 AS ctrl_2#2550].; !Project [Name#0, ctrl_1#2447, NumReads#14 AS ctrl_2#2550] +- Project [Name#0, NumReads#4 AS ctrl_1#2447] +- Project [Name#0, NumReads#4] +- Relation[Name#0,Length#1,EffectiveLength#2,TPM#3,NumReads#4] csv Any idea what my bug is? Kind regards Andy
Re: AnalysisException: Trouble using select() to append multiple columns
Since df1 and df2 are different DataFrames, you will need to use a join. For example: df1.join(df2.selectExpr(“Name”, “NumReads as ctrl_2”), on=[“Name”]) > On Dec 17, 2021, at 16:25, Andrew Davidson wrote: > > > Hi I am a newbie > > I have 16,000 data files, all files have the same number of rows and columns. > The row ids are identical and are in the same order. I want to create a new > data frame that contains the 3rd column from each data file > > I wrote a test program that uses a for loop and Join. It works with my small > test set. I get an OOM when I try to run using the all the data files. I > realize that join ( map reduce) is probably not a great solution for my > problem > > Recently I found several articles that take about the challenge with using > withColumn() and talk about how to use select() to append columns > > https://mungingdata.com/pyspark/select-add-columns-withcolumn/ > https://stackoverflow.com/questions/64627112/adding-multiple-columns-in-pyspark-dataframe-using-a-loop > > I am using pyspark spark-3.1.2-bin-hadoop3.2 > > I wrote a little test program. It am able to append columns created using > pyspark.sql.function.lit(). I am not able to append columns from other data > frames > > df1 > DataFrame[Name: string, ctrl_1: double] > +---+--+ > | Name|ctrl_1| > +---+--+ > | txId_1| 0.0| > | txId_2| 11.0| > | txId_3| 12.0| > | txId_4| 13.0| > | txId_5| 14.0| > | txId_6| 15.0| > | txId_7| 16.0| > | txId_8| 17.0| > | txId_9| 18.0| > |txId_10| 19.0| > +---+--+ > > # use select to append multiple literals > allDF3 = df1.select( ["*", pyf.lit("abc").alias("x"), > pyf.lit("mn0").alias("y")] ) > > allDF3 > DataFrame[Name: string, ctrl_1: double, x: string, y: string] > +---+--+---+---+ > | Name|ctrl_1| x| y| > +---+--+---+---+ > | txId_1| 0.0|abc|mn0| > | txId_2| 11.0|abc|mn0| > | txId_3| 12.0|abc|mn0| > | txId_4| 13.0|abc|mn0| > | txId_5| 14.0|abc|mn0| > | txId_6| 15.0|abc|mn0| > | txId_7| 16.0|abc|mn0| > | txId_8| 17.0|abc|mn0| > | txId_9| 18.0|abc|mn0| > |txId_10| 19.0|abc|mn0| > +---+--+---+---+ > > df2 > DataFrame[Name: string, Length: int, EffectiveLength: double, TPM: double, > NumReads: double] > +---+--+---+++ > | Name|Length|EffectiveLength| TPM|NumReads| > +---+--+---+++ > | txId_1| 1500| 1234.5|12.1| 0.1| > | txId_2| 1510| 1244.5|13.1|11.1| > | txId_3| 1520| 1254.5|14.1|12.1| > | txId_4| 1530| 1264.5|15.1|13.1| > | txId_5| 1540| 1274.5|16.1|14.1| > | txId_6| 1550| 1284.5|17.1|15.1| > | txId_7| 1560| 1294.5|18.1|16.1| > | txId_8| 1570| 1304.5|19.1|17.1| > | txId_9| 1580| 1314.5|20.1|18.1| > |txId_10| 1590| 1324.5|21.1|19.1| > +---+--+---+++ > > s2Col = df2["NumReads"].alias( 'ctrl_2' ) > print("type(s2Col) = {}".format(type(s2Col)) ) > > type(s2Col) = > > allDF4 = df1.select( ["*", s2Col] ) > ~/extraCellularRNA/sparkBin/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/dataframe.py > in select(self, *cols) >1667 [Row(name='Alice', age=12), Row(name='Bob', age=15)] >1668 """ > -> 1669 jdf = self._jdf.select(self._jcols(*cols)) >1670 return DataFrame(jdf, self.sql_ctx) >1671 > > ../../sparkBin/spark-3.1.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py > in __call__(self, *args) >1303 answer = self.gateway_client.send_command(command) >1304 return_value = get_return_value( > -> 1305 answer, self.gateway_client, self.target_id, self.name) >1306 >1307 for temp_arg in temp_args: > > ~/extraCellularRNA/sparkBin/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/utils.py > in deco(*a, **kw) > 115 # Hide where the exception came from that shows a > non-Pythonic > 116 # JVM exception message. > --> 117 raise converted from None > 118 else: > 119 raise > > AnalysisException: Resolved attribute(s) NumReads#14 missing from > Name#0,ctrl_1#2447 in operator !Project [Name#0, ctrl_1#2447, NumReads#14 AS > ctrl_2#2550].; > !Project [Name#0, ctrl_1#2447, NumReads#14 AS ctrl_2#2550] > +- Project [Name#0, NumReads#4 AS ctrl_1#2447] >+- Project [Name#0, NumReads#4] > +- Relation[Name#0,Length#1,EffectiveLength#2,TPM#3,NumReads#4] csv > > Any idea what my bug is? > > Kind regards > > Andy
AnalysisException: Trouble using select() to append multiple columns
Hi I am a newbie I have 16,000 data files, all files have the same number of rows and columns. The row ids are identical and are in the same order. I want to create a new data frame that contains the 3rd column from each data file I wrote a test program that uses a for loop and Join. It works with my small test set. I get an OOM when I try to run using the all the data files. I realize that join ( map reduce) is probably not a great solution for my problem Recently I found several articles that take about the challenge with using withColumn() and talk about how to use select() to append columns https://mungingdata.com/pyspark/select-add-columns-withcolumn/ https://stackoverflow.com/questions/64627112/adding-multiple-columns-in-pyspark-dataframe-using-a-loop I am using pyspark spark-3.1.2-bin-hadoop3.2 I wrote a little test program. It am able to append columns created using pyspark.sql.function.lit(). I am not able to append columns from other data frames df1 DataFrame[Name: string, ctrl_1: double] +---+--+ | Name|ctrl_1| +---+--+ | txId_1| 0.0| | txId_2| 11.0| | txId_3| 12.0| | txId_4| 13.0| | txId_5| 14.0| | txId_6| 15.0| | txId_7| 16.0| | txId_8| 17.0| | txId_9| 18.0| |txId_10| 19.0| +---+--+ # use select to append multiple literals allDF3 = df1.select( ["*", pyf.lit("abc").alias("x"), pyf.lit("mn0").alias("y")] ) allDF3 DataFrame[Name: string, ctrl_1: double, x: string, y: string] +---+--+---+---+ | Name|ctrl_1| x| y| +---+--+---+---+ | txId_1| 0.0|abc|mn0| | txId_2| 11.0|abc|mn0| | txId_3| 12.0|abc|mn0| | txId_4| 13.0|abc|mn0| | txId_5| 14.0|abc|mn0| | txId_6| 15.0|abc|mn0| | txId_7| 16.0|abc|mn0| | txId_8| 17.0|abc|mn0| | txId_9| 18.0|abc|mn0| |txId_10| 19.0|abc|mn0| +---+--+---+---+ df2 DataFrame[Name: string, Length: int, EffectiveLength: double, TPM: double, NumReads: double] +---+--+---+++ | Name|Length|EffectiveLength| TPM|NumReads| +---+--+---+++ | txId_1| 1500| 1234.5|12.1| 0.1| | txId_2| 1510| 1244.5|13.1|11.1| | txId_3| 1520| 1254.5|14.1|12.1| | txId_4| 1530| 1264.5|15.1|13.1| | txId_5| 1540| 1274.5|16.1|14.1| | txId_6| 1550| 1284.5|17.1|15.1| | txId_7| 1560| 1294.5|18.1|16.1| | txId_8| 1570| 1304.5|19.1|17.1| | txId_9| 1580| 1314.5|20.1|18.1| |txId_10| 1590| 1324.5|21.1|19.1| +---+--+---+++ s2Col = df2["NumReads"].alias( 'ctrl_2' ) print("type(s2Col) = {}".format(type(s2Col)) ) type(s2Col) = allDF4 = df1.select( ["*", s2Col] ) ~/extraCellularRNA/sparkBin/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/dataframe.py in select(self, *cols) 1667 [Row(name='Alice', age=12), Row(name='Bob', age=15)] 1668 """ -> 1669 jdf = self._jdf.select(self._jcols(*cols)) 1670 return DataFrame(jdf, self.sql_ctx) 1671 ../../sparkBin/spark-3.1.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args) 1303 answer = self.gateway_client.send_command(command) 1304 return_value = get_return_value( -> 1305 answer, self.gateway_client, self.target_id, self.name) 1306 1307 for temp_arg in temp_args: ~/extraCellularRNA/sparkBin/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/utils.py in deco(*a, **kw) 115 # Hide where the exception came from that shows a non-Pythonic 116 # JVM exception message. --> 117 raise converted from None 118 else: 119 raise AnalysisException: Resolved attribute(s) NumReads#14 missing from Name#0,ctrl_1#2447 in operator !Project [Name#0, ctrl_1#2447, NumReads#14 AS ctrl_2#2550].; !Project [Name#0, ctrl_1#2447, NumReads#14 AS ctrl_2#2550] +- Project [Name#0, NumReads#4 AS ctrl_1#2447] +- Project [Name#0, NumReads#4] +- Relation[Name#0,Length#1,EffectiveLength#2,TPM#3,NumReads#4] csv Any idea what my bug is? Kind regards Andy