kondziolka9ld created SPARK-44739:
-------------------------------------

             Summary: Conflicting attribute during join two times the same 
table (AQE is disabled)
                 Key: SPARK-44739
                 URL: https://issues.apache.org/jira/browse/SPARK-44739
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 3.3.0
            Reporter: kondziolka9ld


h2. Issue

I come across a something that seems to be bug in *pyspark* (when I disable 
adaptive queries).

It is about joining two times the same dataframe.

It is needed to `checkpoint`  dataframe `j1` before joining to expose this 
issue.
----
h2. Reproduction steps

 
{code:java}
pyspark --conf spark.sql.adaptive.enabled=false
Python 3.8.10 (default, Nov 14 2022, 12:59:47) 
[GCC 9.4.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
23/08/09 10:18:54 WARN Utils: Your hostname, kondziolka-dd-laptop resolves to a 
loopback address: 127.0.1.1; using 192.168.0.18 instead (on interface wlp0s20f3)
23/08/09 10:18:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another 
address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
setLogLevel(newLevel).
23/08/09 10:18:56 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.3.0
      /_/Using Python version 3.8.10 (default, Nov 14 2022 12:59:47)
Spark context Web UI available at http://192.168.0.18:4040
Spark context available as 'sc' (master = local[*], app id = 
local-1691569137130).
SparkSession available as 'spark'.

>>> sc.setCheckpointDir("file:///tmp")
>>> df1=spark.createDataFrame([(1, 42)], ["id", "fval"])
>>> df2=spark.createDataFrame([(1, 0, "jeden")], ["id", "target", "aux"]) >>> 
>>> >>> df2.explain()
== Physical Plan ==
*(1) Scan ExistingRDD[id#4L,target#5L,aux#6]
>>> j1=df1.join(df2, ["id"]).select("fval", "aux").checkpoint()
>>> j1.explain()
== Physical Plan ==
*(1) Scan ExistingRDD[fval#1L,aux#6]
>>> # we see that both j1 and df2 refers to the same attribute aux#6
>>> # let's join df2 to j1. Both of them has aux column.
>>> j2=j1.join(df2, "aux")                                                      
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File 
"/home/kondziolkadd/.local/lib/python3.8/site-packages/pyspark/sql/dataframe.py",
 line 1539, in join
    jdf = self._jdf.join(other._jdf, on, how)
  File 
"/home/user/.local/lib/python3.8/site-packages/pyspark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py",
 line 1321, in __call__
  File "/home/user/.local/lib/python3.8/site-packages/pyspark/sql/utils.py", 
line 196, in deco
    raise converted from None
pyspark.sql.utils.AnalysisException: 
Failure when resolving conflicting references in Join:
'Join Inner
:- LogicalRDD [fval#1L, aux#6], false
+- LogicalRDD [id#4L, target#5L, aux#6], falseConflicting attributes: aux#6
;
'Join Inner
:- LogicalRDD [fval#1L, aux#6], false
+- LogicalRDD [id#4L, target#5L, aux#6], false
{code}
 
----
h2. Workaround

The workaround is about renaming columns twice times - I mean identity rename 
`X -> X' -> X`.

It looks like it forces rewrite of metadata (change attribute id) and in this 
way it avoid conflict.
{code:java}
>>> sc.setCheckpointDir("file:///tmp")
>>> df1=spark.createDataFrame([(1, 42)], ["id", "fval"])
>>> df2=spark.createDataFrame([(1, 0, "jeden")], ["id", "target", "aux"])
>>> df2.explain()
== Physical Plan ==
*(1) Scan ExistingRDD[id#4L,target#5L,aux#6]
>>> j1=df1.join(df2, ["id"]).select("fval", "aux").withColumnRenamed("aux", 
>>> "_aux").withColumnRenamed("_aux", "aux").checkpoint()
>>> j1.explain()                                                                
== Physical Plan ==
*(1) Scan ExistingRDD[fval#1L,aux#19]
>>> j2=j1.join(df2, "aux")
>>> 
 {code}
----
h2. Others
 # Repartition of `j1` before checkpoint is workaround as well (it does not 
change id of attribute)

{code:java}
j1=df1.join(df2, ["id"]).select("fval", "aux").repartition(100).checkpoint() 
{code}

 # Without `checkpoint` issue does not occur (although id is the same)

{code:java}
>>> j1=df1.join(df2, ["id"]).select("fval", "aux")
>>> j2=j1.join(df2, "aux") {code}

 # Without disabling AQE it does not occur
 # I was not able to reproduce it on spark -  by saying that I mean that I 
reproduced it only in `pyspark`.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to