Nah, it's going to translate to the same plan as the equivalent SQL. On Fri, Dec 24, 2021, 5:09 AM Gourav Sengupta <gourav.sengu...@gmail.com> wrote:
> Hi, > > please note that using SQL is much more performant, and easier to manage > these kind of issues. You might want to look at the SPARK UI to see the > advantage of using SQL over dataframes API. > > > Regards, > Gourav Sengupta > > On Sat, Dec 18, 2021 at 5:40 PM Andrew Davidson <aedav...@ucsc.edu.invalid> > wrote: > >> Thanks Nicholas >> >> >> >> Andy >> >> >> >> *From: *Nicholas Gustafson <njgustaf...@gmail.com> >> *Date: *Friday, December 17, 2021 at 6:12 PM >> *To: *Andrew Davidson <aedav...@ucsc.edu.invalid> >> *Cc: *"user@spark.apache.org" <user@spark.apache.org> >> *Subject: *Re: AnalysisException: Trouble using select() to append >> multiple columns >> >> >> >> Since df1 and df2 are different DataFrames, you will need to use a join. >> For example: df1.join(df2.selectExpr(“Name”, “NumReads as ctrl_2”), >> on=[“Name”]) >> >> >> >> On Dec 17, 2021, at 16:25, Andrew Davidson <aedav...@ucsc.edu.invalid> >> wrote: >> >> >> >> Hi I am a newbie >> >> >> >> I have 16,000 data files, all files have the same number of rows and >> columns. The row ids are identical and are in the same order. I want to >> create a new data frame that contains the 3rd column from each data file >> >> >> >> I wrote a test program that uses a for loop and Join. It works with my >> small test set. I get an OOM when I try to run using the all the data >> files. I realize that join ( map reduce) is probably not a great solution >> for my problem >> >> >> >> Recently I found several articles that take about the challenge with >> using withColumn() and talk about how to use select() to append columns >> >> >> >> https://mungingdata.com/pyspark/select-add-columns-withcolumn/ >> >> >> https://stackoverflow.com/questions/64627112/adding-multiple-columns-in-pyspark-dataframe-using-a-loop >> >> >> >> I am using pyspark spark-3.1.2-bin-hadoop3.2 >> >> >> >> I wrote a little test program. It am able to append columns created using >> pyspark.sql.function.lit(). I am not able to append columns from other data >> frames >> >> >> >> df1 >> >> DataFrame[Name: string, ctrl_1: double] >> >> +-------+------+ >> >> | Name|ctrl_1| >> >> +-------+------+ >> >> | txId_1| 0.0| >> >> | txId_2| 11.0| >> >> | txId_3| 12.0| >> >> | txId_4| 13.0| >> >> | txId_5| 14.0| >> >> | txId_6| 15.0| >> >> | txId_7| 16.0| >> >> | txId_8| 17.0| >> >> | txId_9| 18.0| >> >> |txId_10| 19.0| >> >> +-------+------+ >> >> >> >> # use select to append multiple literals >> >> allDF3 = df1.select( ["*", pyf.lit("abc").alias("x"), >> pyf.lit("mn0").alias("y")] ) >> >> >> >> allDF3 >> >> DataFrame[Name: string, ctrl_1: double, x: string, y: string] >> >> +-------+------+---+---+ >> >> | Name|ctrl_1| x| y| >> >> +-------+------+---+---+ >> >> | txId_1| 0.0|abc|mn0| >> >> | txId_2| 11.0|abc|mn0| >> >> | txId_3| 12.0|abc|mn0| >> >> | txId_4| 13.0|abc|mn0| >> >> | txId_5| 14.0|abc|mn0| >> >> | txId_6| 15.0|abc|mn0| >> >> | txId_7| 16.0|abc|mn0| >> >> | txId_8| 17.0|abc|mn0| >> >> | txId_9| 18.0|abc|mn0| >> >> |txId_10| 19.0|abc|mn0| >> >> +-------+------+---+---+ >> >> >> >> df2 >> >> DataFrame[Name: string, Length: int, EffectiveLength: double, TPM: double, >> NumReads: double] >> >> +-------+------+---------------+----+--------+ >> >> | Name|Length|EffectiveLength| TPM|NumReads| >> >> +-------+------+---------------+----+--------+ >> >> | txId_1| 1500| 1234.5|12.1| 0.1| >> >> | txId_2| 1510| 1244.5|13.1| 11.1| >> >> | txId_3| 1520| 1254.5|14.1| 12.1| >> >> | txId_4| 1530| 1264.5|15.1| 13.1| >> >> | txId_5| 1540| 1274.5|16.1| 14.1| >> >> | txId_6| 1550| 1284.5|17.1| 15.1| >> >> | txId_7| 1560| 1294.5|18.1| 16.1| >> >> | txId_8| 1570| 1304.5|19.1| 17.1| >> >> | txId_9| 1580| 1314.5|20.1| 18.1| >> >> |txId_10| 1590| 1324.5|21.1| 19.1| >> >> +-------+------+---------------+----+--------+ >> >> >> >> s2Col = df2["NumReads"].alias( 'ctrl_2' ) >> >> print("type(s2Col) = {}".format(type(s2Col)) ) >> >> >> >> type(s2Col) = <class 'pyspark.sql.column.Column'> >> >> >> >> allDF4 = df1.select( ["*", s2Col] ) >> >> ~/extraCellularRNA/sparkBin/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/dataframe.py >> in select(self, *cols) >> >> * 1667* [Row(name='Alice', age=12), Row(name='Bob', age=15)] >> >> * 1668* """ >> >> -> 1669 jdf = self._jdf.select(self._jcols(*cols)) >> >> * 1670* return DataFrame(jdf, self.sql_ctx) >> >> * 1671* >> >> >> >> ../../sparkBin/spark-3.1.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py >> in __call__(self, *args) >> >> * 1303* answer = self.gateway_client.send_command(command) >> >> * 1304* return_value = get_return_value( >> >> -> 1305 answer, self.gateway_client, self.target_id, self.name) >> >> * 1306* >> >> * 1307* for temp_arg in temp_args: >> >> >> >> ~/extraCellularRNA/sparkBin/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/utils.py >> in deco(*a, **kw) >> >> * 115* # Hide where the exception came from that shows a >> non-Pythonic >> >> * 116* # JVM exception message. >> >> --> 117 raise converted from None >> >> * 118* else: >> >> * 119* raise >> >> >> >> AnalysisException: Resolved attribute(s) NumReads#14 missing from >> Name#0,ctrl_1#2447 in operator !Project [Name#0, ctrl_1#2447, NumReads#14 AS >> ctrl_2#2550].; >> >> !Project [Name#0, ctrl_1#2447, NumReads#14 AS ctrl_2#2550] >> >> +- Project [Name#0, NumReads#4 AS ctrl_1#2447] >> >> +- Project [Name#0, NumReads#4] >> >> +- Relation[Name#0,Length#1,EffectiveLength#2,TPM#3,NumReads#4] csv >> >> >> >> Any idea what my bug is? >> >> >> >> Kind regards >> >> >> >> Andy >> >>