Hi,

please note that using SQL is much more performant, and easier to manage
these kind of issues. You might want to look at the SPARK UI to see the
advantage of using SQL over dataframes API.


Regards,
Gourav Sengupta

On Sat, Dec 18, 2021 at 5:40 PM Andrew Davidson <aedav...@ucsc.edu.invalid>
wrote:

> Thanks Nicholas
>
>
>
> Andy
>
>
>
> *From: *Nicholas Gustafson <njgustaf...@gmail.com>
> *Date: *Friday, December 17, 2021 at 6:12 PM
> *To: *Andrew Davidson <aedav...@ucsc.edu.invalid>
> *Cc: *"user@spark.apache.org" <user@spark.apache.org>
> *Subject: *Re: AnalysisException: Trouble using select() to append
> multiple columns
>
>
>
> Since df1 and df2 are different DataFrames, you will need to use a join.
> For example: df1.join(df2.selectExpr(“Name”, “NumReads as ctrl_2”),
> on=[“Name”])
>
>
>
> On Dec 17, 2021, at 16:25, Andrew Davidson <aedav...@ucsc.edu.invalid>
> wrote:
>
> 
>
> Hi I am a newbie
>
>
>
> I have 16,000 data files, all files have the same number of rows and
> columns. The row ids are identical and are in the same order. I want to
> create a new data frame that contains the 3rd column from each data file
>
>
>
> I wrote a test program that uses a for loop and Join. It works with my
> small test set. I get an OOM when I try to run using the all the data
> files. I realize that join ( map reduce) is probably not a great solution
> for my problem
>
>
>
> Recently I found several articles that take about the challenge with using
> withColumn() and talk about how to use select() to append columns
>
>
>
> https://mungingdata.com/pyspark/select-add-columns-withcolumn/
>
>
> https://stackoverflow.com/questions/64627112/adding-multiple-columns-in-pyspark-dataframe-using-a-loop
>
>
>
> I am using pyspark spark-3.1.2-bin-hadoop3.2
>
>
>
> I wrote a little test program. It am able to append columns created using
> pyspark.sql.function.lit(). I am not able to append columns from other data
> frames
>
>
>
> df1
>
> DataFrame[Name: string, ctrl_1: double]
>
> +-------+------+
>
> |   Name|ctrl_1|
>
> +-------+------+
>
> | txId_1|   0.0|
>
> | txId_2|  11.0|
>
> | txId_3|  12.0|
>
> | txId_4|  13.0|
>
> | txId_5|  14.0|
>
> | txId_6|  15.0|
>
> | txId_7|  16.0|
>
> | txId_8|  17.0|
>
> | txId_9|  18.0|
>
> |txId_10|  19.0|
>
> +-------+------+
>
>
>
> # use select to append multiple literals
>
> allDF3 = df1.select( ["*", pyf.lit("abc").alias("x"),
> pyf.lit("mn0").alias("y")] )
>
>
>
> allDF3
>
> DataFrame[Name: string, ctrl_1: double, x: string, y: string]
>
> +-------+------+---+---+
>
> |   Name|ctrl_1|  x|  y|
>
> +-------+------+---+---+
>
> | txId_1|   0.0|abc|mn0|
>
> | txId_2|  11.0|abc|mn0|
>
> | txId_3|  12.0|abc|mn0|
>
> | txId_4|  13.0|abc|mn0|
>
> | txId_5|  14.0|abc|mn0|
>
> | txId_6|  15.0|abc|mn0|
>
> | txId_7|  16.0|abc|mn0|
>
> | txId_8|  17.0|abc|mn0|
>
> | txId_9|  18.0|abc|mn0|
>
> |txId_10|  19.0|abc|mn0|
>
> +-------+------+---+---+
>
>
>
> df2
>
> DataFrame[Name: string, Length: int, EffectiveLength: double, TPM: double, 
> NumReads: double]
>
> +-------+------+---------------+----+--------+
>
> |   Name|Length|EffectiveLength| TPM|NumReads|
>
> +-------+------+---------------+----+--------+
>
> | txId_1|  1500|         1234.5|12.1|     0.1|
>
> | txId_2|  1510|         1244.5|13.1|    11.1|
>
> | txId_3|  1520|         1254.5|14.1|    12.1|
>
> | txId_4|  1530|         1264.5|15.1|    13.1|
>
> | txId_5|  1540|         1274.5|16.1|    14.1|
>
> | txId_6|  1550|         1284.5|17.1|    15.1|
>
> | txId_7|  1560|         1294.5|18.1|    16.1|
>
> | txId_8|  1570|         1304.5|19.1|    17.1|
>
> | txId_9|  1580|         1314.5|20.1|    18.1|
>
> |txId_10|  1590|         1324.5|21.1|    19.1|
>
> +-------+------+---------------+----+--------+
>
>
>
> s2Col = df2["NumReads"].alias( 'ctrl_2' )
>
> print("type(s2Col) = {}".format(type(s2Col)) )
>
>
>
> type(s2Col) = <class 'pyspark.sql.column.Column'>
>
>
>
> allDF4 = df1.select( ["*", s2Col] )
>
> ~/extraCellularRNA/sparkBin/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/dataframe.py
>  in select(self, *cols)
>
> *   1667*         [Row(name='Alice', age=12), Row(name='Bob', age=15)]
>
> *   1668*         """
>
> -> 1669         jdf = self._jdf.select(self._jcols(*cols))
>
> *   1670*         return DataFrame(jdf, self.sql_ctx)
>
> *   1671*
>
>
>
> ../../sparkBin/spark-3.1.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
>  in __call__(self, *args)
>
> *   1303*         answer = self.gateway_client.send_command(command)
>
> *   1304*         return_value = get_return_value(
>
> -> 1305             answer, self.gateway_client, self.target_id, self.name)
>
> *   1306*
>
> *   1307*         for temp_arg in temp_args:
>
>
>
> ~/extraCellularRNA/sparkBin/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/utils.py
>  in deco(*a, **kw)
>
> *    115*                 # Hide where the exception came from that shows a 
> non-Pythonic
>
> *    116*                 # JVM exception message.
>
> --> 117                 raise converted from None
>
> *    118*             else:
>
> *    119*                 raise
>
>
>
> AnalysisException: Resolved attribute(s) NumReads#14 missing from 
> Name#0,ctrl_1#2447 in operator !Project [Name#0, ctrl_1#2447, NumReads#14 AS 
> ctrl_2#2550].;
>
> !Project [Name#0, ctrl_1#2447, NumReads#14 AS ctrl_2#2550]
>
> +- Project [Name#0, NumReads#4 AS ctrl_1#2447]
>
>    +- Project [Name#0, NumReads#4]
>
>       +- Relation[Name#0,Length#1,EffectiveLength#2,TPM#3,NumReads#4] csv
>
>
>
> Any idea what my bug is?
>
>
>
> Kind regards
>
>
>
> Andy
>
>

Reply via email to