Re: AnalysisException: Trouble using select() to append multiple columns

2021-12-17 Thread Nicholas Gustafson
Since df1 and df2 are different DataFrames, you will need to use a join. For 
example: df1.join(df2.selectExpr(“Name”, “NumReads as ctrl_2”), on=[“Name”])

> On Dec 17, 2021, at 16:25, Andrew Davidson  wrote:
> 
> 
> Hi I am a newbie
>  
> I have 16,000 data files, all files have the same number of rows and columns. 
> The row ids are identical and are in the same order. I want to create a new 
> data frame that contains the 3rd column from each data file
>  
> I wrote a test program that uses a for loop and Join. It works with my small 
> test set. I get an OOM when I try to run using the all the data files. I 
> realize that join ( map reduce) is probably not a great solution for my 
> problem
>  
> Recently I found several articles that take about the challenge with using 
> withColumn() and talk about how to use select() to append columns
>  
> https://mungingdata.com/pyspark/select-add-columns-withcolumn/
> https://stackoverflow.com/questions/64627112/adding-multiple-columns-in-pyspark-dataframe-using-a-loop
>  
> I am using pyspark spark-3.1.2-bin-hadoop3.2
>  
> I wrote a little test program. It am able to append columns created using 
> pyspark.sql.function.lit(). I am not able to append columns from other data 
> frames
>  
> df1
> DataFrame[Name: string, ctrl_1: double]
> +---+--+
> |   Name|ctrl_1|
> +---+--+
> | txId_1|   0.0|
> | txId_2|  11.0|
> | txId_3|  12.0|
> | txId_4|  13.0|
> | txId_5|  14.0|
> | txId_6|  15.0|
> | txId_7|  16.0|
> | txId_8|  17.0|
> | txId_9|  18.0|
> |txId_10|  19.0|
> +---+--+
>  
> # use select to append multiple literals
> allDF3 = df1.select( ["*", pyf.lit("abc").alias("x"), 
> pyf.lit("mn0").alias("y")] )
>  
> allDF3
> DataFrame[Name: string, ctrl_1: double, x: string, y: string]
> +---+--+---+---+
> |   Name|ctrl_1|  x|  y|
> +---+--+---+---+
> | txId_1|   0.0|abc|mn0|
> | txId_2|  11.0|abc|mn0|
> | txId_3|  12.0|abc|mn0|
> | txId_4|  13.0|abc|mn0|
> | txId_5|  14.0|abc|mn0|
> | txId_6|  15.0|abc|mn0|
> | txId_7|  16.0|abc|mn0|
> | txId_8|  17.0|abc|mn0|
> | txId_9|  18.0|abc|mn0|
> |txId_10|  19.0|abc|mn0|
> +---+--+---+---+
>  
> df2
> DataFrame[Name: string, Length: int, EffectiveLength: double, TPM: double, 
> NumReads: double]
> +---+--+---+++
> |   Name|Length|EffectiveLength| TPM|NumReads|
> +---+--+---+++
> | txId_1|  1500| 1234.5|12.1| 0.1|
> | txId_2|  1510| 1244.5|13.1|11.1|
> | txId_3|  1520| 1254.5|14.1|12.1|
> | txId_4|  1530| 1264.5|15.1|13.1|
> | txId_5|  1540| 1274.5|16.1|14.1|
> | txId_6|  1550| 1284.5|17.1|15.1|
> | txId_7|  1560| 1294.5|18.1|16.1|
> | txId_8|  1570| 1304.5|19.1|17.1|
> | txId_9|  1580| 1314.5|20.1|18.1|
> |txId_10|  1590| 1324.5|21.1|19.1|
> +---+--+---+++
>  
> s2Col = df2["NumReads"].alias( 'ctrl_2' )
> print("type(s2Col) = {}".format(type(s2Col)) )
>  
> type(s2Col) = 
>  
> allDF4 = df1.select( ["*", s2Col] )
> ~/extraCellularRNA/sparkBin/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/dataframe.py
>  in select(self, *cols)
>1667 [Row(name='Alice', age=12), Row(name='Bob', age=15)]
>1668 """
> -> 1669 jdf = self._jdf.select(self._jcols(*cols))
>1670 return DataFrame(jdf, self.sql_ctx)
>1671 
>  
> ../../sparkBin/spark-3.1.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
>  in __call__(self, *args)
>1303 answer = self.gateway_client.send_command(command)
>1304 return_value = get_return_value(
> -> 1305 answer, self.gateway_client, self.target_id, self.name)
>1306 
>1307 for temp_arg in temp_args:
>  
> ~/extraCellularRNA/sparkBin/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/utils.py
>  in deco(*a, **kw)
> 115 # Hide where the exception came from that shows a 
> non-Pythonic
> 116 # JVM exception message.
> --> 117 raise converted from None
> 118 else:
> 119 raise
>  
> AnalysisException: Resolved attribute(s) NumReads#14 missing from 
> Name#0,ctrl_1#2447 in operator !Project [Name#0, ctrl_1#2447, NumReads#14 AS 
> ctrl_2#2550].;
> !Project [Name#0, ctrl_1#2447, NumReads#14 AS ctrl_2#2550]
> +- Project [Name#0, NumReads#4 AS ctrl_1#2447]
>+- Project [Name#0, NumReads#4]
>   +- Relation[Name#0,Length#1,EffectiveLength#2,TPM#3,NumReads#4] csv
>  
> Any idea what my bug is?
>  
> Kind regards
>  
> Andy


Re: unit testing for spark code

2021-03-22 Thread Nicholas Gustafson
I've found pytest works well if you're using PySpark. Though if you have a
lot of tests, running them all can be pretty slow.

On Mon, Mar 22, 2021 at 6:32 AM Amit Sharma  wrote:

> Hi, can we write unit tests for spark code. Is there any specific
> framework?
>
>
> Thanks
> Amit
>


Re: How can I use pyspark to upsert one row without replacing entire table

2020-08-12 Thread Nicholas Gustafson
The delta docs have examples of upserting:

https://docs.delta.io/0.4.0/delta-update.html#upsert-into-a-table-using-merge

> On Aug 12, 2020, at 08:31, Siavash Namvar  wrote:
> 
> 
> Thanks Sean,
> 
> Do you have any URL or reference to help me how to upsert in Spark? I need to 
> update Sybase db
> 
>> On Wed, Aug 12, 2020 at 11:06 AM Sean Owen  wrote:
>> It's not so much Spark but the data format, whether it supports
>> upserts. Parquet, CSV, JSON, etc would not.
>> That is what Delta, Hudi et al are for, and yes you can upsert them in Spark.
>> 
>> On Wed, Aug 12, 2020 at 9:57 AM Siavash Namvar  wrote:
>> >
>> > Hi,
>> >
>> > I have a use case, and read data from a db table and need to update few 
>> > rows based on primary key without replacing the entire table.
>> >
>> > for instance if I have 3 following rows
>> >
>> > ---
>> > id | fname
>> > ---
>> >  1 | john
>> > ---
>> >  2 | Steve
>> > ---
>> >  3 | Jack
>> > ---
>> >
>> > And I would like to update the row with id=2 from Steve to Michael without 
>> > replacing the entire table and the outpur looks like
>> >
>> > ---
>> > id | fname
>> > ---
>> >  1 | john
>> > ---
>> >  2 | Michael
>> > ---
>> >  3 | Jack
>> > ---
>> >
>> > Keep in mind the actual db table is so huge and database is old and cannot 
>> > read and replace entire table
>> >
>> > Thanks