kondziolka9ld created SPARK-44739: ------------------------------------- Summary: Conflicting attribute during join two times the same table (AQE is disabled) Key: SPARK-44739 URL: https://issues.apache.org/jira/browse/SPARK-44739 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 3.3.0 Reporter: kondziolka9ld
h2. Issue I come across a something that seems to be bug in *pyspark* (when I disable adaptive queries). It is about joining two times the same dataframe. It is needed to `checkpoint` dataframe `j1` before joining to expose this issue. ---- h2. Reproduction steps {code:java} pyspark --conf spark.sql.adaptive.enabled=false Python 3.8.10 (default, Nov 14 2022, 12:59:47) [GCC 9.4.0] on linux Type "help", "copyright", "credits" or "license" for more information. 23/08/09 10:18:54 WARN Utils: Your hostname, kondziolka-dd-laptop resolves to a loopback address: 127.0.1.1; using 192.168.0.18 instead (on interface wlp0s20f3) 23/08/09 10:18:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 23/08/09 10:18:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 3.3.0 /_/Using Python version 3.8.10 (default, Nov 14 2022 12:59:47) Spark context Web UI available at http://192.168.0.18:4040 Spark context available as 'sc' (master = local[*], app id = local-1691569137130). SparkSession available as 'spark'. >>> sc.setCheckpointDir("file:///tmp") >>> df1=spark.createDataFrame([(1, 42)], ["id", "fval"]) >>> df2=spark.createDataFrame([(1, 0, "jeden")], ["id", "target", "aux"]) >>> >>> >>> df2.explain() == Physical Plan == *(1) Scan ExistingRDD[id#4L,target#5L,aux#6] >>> j1=df1.join(df2, ["id"]).select("fval", "aux").checkpoint() >>> j1.explain() == Physical Plan == *(1) Scan ExistingRDD[fval#1L,aux#6] >>> # we see that both j1 and df2 refers to the same attribute aux#6 >>> # let's join df2 to j1. Both of them has aux column. >>> j2=j1.join(df2, "aux") Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/home/kondziolkadd/.local/lib/python3.8/site-packages/pyspark/sql/dataframe.py", line 1539, in join jdf = self._jdf.join(other._jdf, on, how) File "/home/user/.local/lib/python3.8/site-packages/pyspark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__ File "/home/user/.local/lib/python3.8/site-packages/pyspark/sql/utils.py", line 196, in deco raise converted from None pyspark.sql.utils.AnalysisException: Failure when resolving conflicting references in Join: 'Join Inner :- LogicalRDD [fval#1L, aux#6], false +- LogicalRDD [id#4L, target#5L, aux#6], falseConflicting attributes: aux#6 ; 'Join Inner :- LogicalRDD [fval#1L, aux#6], false +- LogicalRDD [id#4L, target#5L, aux#6], false {code} ---- h2. Workaround The workaround is about renaming columns twice times - I mean identity rename `X -> X' -> X`. It looks like it forces rewrite of metadata (change attribute id) and in this way it avoid conflict. {code:java} >>> sc.setCheckpointDir("file:///tmp") >>> df1=spark.createDataFrame([(1, 42)], ["id", "fval"]) >>> df2=spark.createDataFrame([(1, 0, "jeden")], ["id", "target", "aux"]) >>> df2.explain() == Physical Plan == *(1) Scan ExistingRDD[id#4L,target#5L,aux#6] >>> j1=df1.join(df2, ["id"]).select("fval", "aux").withColumnRenamed("aux", >>> "_aux").withColumnRenamed("_aux", "aux").checkpoint() >>> j1.explain() == Physical Plan == *(1) Scan ExistingRDD[fval#1L,aux#19] >>> j2=j1.join(df2, "aux") >>> {code} ---- h2. Others # Repartition of `j1` before checkpoint is workaround as well (it does not change id of attribute) {code:java} j1=df1.join(df2, ["id"]).select("fval", "aux").repartition(100).checkpoint() {code} # Without `checkpoint` issue does not occur (although id is the same) {code:java} >>> j1=df1.join(df2, ["id"]).select("fval", "aux") >>> j2=j1.join(df2, "aux") {code} # Without disabling AQE it does not occur # I was not able to reproduce it on spark - by saying that I mean that I reproduced it only in `pyspark`. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org