Hi I am a newbie

I have 16,000 data files, all files have the same number of rows and columns. 
The row ids are identical and are in the same order. I want to create a new 
data frame that contains the 3rd column from each data file

I wrote a test program that uses a for loop and Join. It works with my small 
test set. I get an OOM when I try to run using the all the data files. I 
realize that join ( map reduce) is probably not a great solution for my problem

Recently I found several articles that take about the challenge with using 
withColumn() and talk about how to use select() to append columns

https://mungingdata.com/pyspark/select-add-columns-withcolumn/
https://stackoverflow.com/questions/64627112/adding-multiple-columns-in-pyspark-dataframe-using-a-loop


I am using pyspark spark-3.1.2-bin-hadoop3.2

I wrote a little test program. It am able to append columns created using 
pyspark.sql.function.lit(). I am not able to append columns from other data 
frames


df1

DataFrame[Name: string, ctrl_1: double]

+-------+------+

|   Name|ctrl_1|

+-------+------+

| txId_1|   0.0|

| txId_2|  11.0|

| txId_3|  12.0|

| txId_4|  13.0|

| txId_5|  14.0|

| txId_6|  15.0|

| txId_7|  16.0|

| txId_8|  17.0|

| txId_9|  18.0|

|txId_10|  19.0|

+-------+------+

# use select to append multiple literals
allDF3 = df1.select( ["*", pyf.lit("abc").alias("x"), 
pyf.lit("mn0").alias("y")] )

allDF3
DataFrame[Name: string, ctrl_1: double, x: string, y: string]
+-------+------+---+---+
|   Name|ctrl_1|  x|  y|
+-------+------+---+---+
| txId_1|   0.0|abc|mn0|
| txId_2|  11.0|abc|mn0|
| txId_3|  12.0|abc|mn0|
| txId_4|  13.0|abc|mn0|
| txId_5|  14.0|abc|mn0|
| txId_6|  15.0|abc|mn0|
| txId_7|  16.0|abc|mn0|
| txId_8|  17.0|abc|mn0|
| txId_9|  18.0|abc|mn0|
|txId_10|  19.0|abc|mn0|
+-------+------+---+---+


df2

DataFrame[Name: string, Length: int, EffectiveLength: double, TPM: double, 
NumReads: double]

+-------+------+---------------+----+--------+

|   Name|Length|EffectiveLength| TPM|NumReads|

+-------+------+---------------+----+--------+

| txId_1|  1500|         1234.5|12.1|     0.1|

| txId_2|  1510|         1244.5|13.1|    11.1|

| txId_3|  1520|         1254.5|14.1|    12.1|

| txId_4|  1530|         1264.5|15.1|    13.1|

| txId_5|  1540|         1274.5|16.1|    14.1|

| txId_6|  1550|         1284.5|17.1|    15.1|

| txId_7|  1560|         1294.5|18.1|    16.1|

| txId_8|  1570|         1304.5|19.1|    17.1|

| txId_9|  1580|         1314.5|20.1|    18.1|

|txId_10|  1590|         1324.5|21.1|    19.1|

+-------+------+---------------+----+--------+



s2Col = df2["NumReads"].alias( 'ctrl_2' )

print("type(s2Col) = {}".format(type(s2Col)) )



type(s2Col) = <class 'pyspark.sql.column.Column'>

allDF4 = df1.select( ["*", s2Col] )

~/extraCellularRNA/sparkBin/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/dataframe.py
 in select(self, *cols)

   1667         [Row(name='Alice', age=12), Row(name='Bob', age=15)]

   1668         """

-> 1669         jdf = self._jdf.select(self._jcols(*cols))

   1670         return DataFrame(jdf, self.sql_ctx)

   1671



../../sparkBin/spark-3.1.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py
 in __call__(self, *args)

   1303         answer = self.gateway_client.send_command(command)

   1304         return_value = get_return_value(

-> 1305             answer, self.gateway_client, self.target_id, self.name)

   1306

   1307         for temp_arg in temp_args:



~/extraCellularRNA/sparkBin/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/utils.py
 in deco(*a, **kw)

    115                 # Hide where the exception came from that shows a 
non-Pythonic

    116                 # JVM exception message.

--> 117                 raise converted from None

    118             else:

    119                 raise



AnalysisException: Resolved attribute(s) NumReads#14 missing from 
Name#0,ctrl_1#2447 in operator !Project [Name#0, ctrl_1#2447, NumReads#14 AS 
ctrl_2#2550].;

!Project [Name#0, ctrl_1#2447, NumReads#14 AS ctrl_2#2550]

+- Project [Name#0, NumReads#4 AS ctrl_1#2447]

   +- Project [Name#0, NumReads#4]

      +- Relation[Name#0,Length#1,EffectiveLength#2,TPM#3,NumReads#4] csv

Any idea what my bug is?

Kind regards

Andy

Reply via email to