[ https://issues.apache.org/jira/browse/SPARK-33071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17209050#comment-17209050 ]
George commented on SPARK-33071: -------------------------------- [~EveLiao] Yeah that sounds correct, another workaround I found was something like {code:java} df1_cols = list(map(lambda x: x + "_1", df1.columns)) df1 = toDF(df1_cols){code} In my mind the reason why I view this as a bug is that it takes a fairly deep understanding of how spark handles columns, copying and references to expect this behavior. For many developers, this just wouldn't be the behavior they expect to see. > Join with ambiguous column succeeding but giving wrong output > ------------------------------------------------------------- > > Key: SPARK-33071 > URL: https://issues.apache.org/jira/browse/SPARK-33071 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.4.4, 3.0.1 > Reporter: George > Priority: Major > Labels: correctness > > When joining two datasets where one column in each dataset is sourced from > the same input dataset, the join successfully runs, but does not select the > correct columns, leading to incorrect output. > Repro using pyspark: > {code:java} > sc.version > import pyspark.sql.functions as F > d = [{'key': 'a', 'sales': 1, 'units' : 2}, {'key': 'a', 'sales': 2, 'units' > : 4}, {'key': 'b', 'sales': 5, 'units' : 10}, {'key': 'c', 'sales': 1, > 'units' : 2}, {'key': 'd', 'sales': 3, 'units' : 6}] > input_df = spark.createDataFrame(d) > df1 = input_df.groupBy("key").agg(F.sum('sales').alias('sales')) > df2 = input_df.groupBy("key").agg(F.sum('units').alias('units')) > df1 = df1.filter(F.col("key") != F.lit("c")) > df2 = df2.filter(F.col("key") != F.lit("d")) > ret = df1.join(df2, df1.key == df2.key, "full").select( > df1["key"].alias("df1_key"), > df2["key"].alias("df2_key"), > df1["sales"], > df2["units"], > F.coalesce(df1["key"], df2["key"]).alias("key")) > ret.show() > ret.explain(){code} > output for 2.4.4: > {code:java} > >>> sc.version > u'2.4.4' > >>> import pyspark.sql.functions as F > >>> d = [{'key': 'a', 'sales': 1, 'units' : 2}, {'key': 'a', 'sales': 2, > >>> 'units' : 4}, {'key': 'b', 'sales': 5, 'units' : 10}, {'key': 'c', > >>> 'sales': 1, 'units' : 2}, {'key': 'd', 'sales': 3, 'units' : 6}] > >>> input_df = spark.createDataFrame(d) > >>> df1 = input_df.groupBy("key").agg(F.sum('sales').alias('sales')) > >>> df2 = input_df.groupBy("key").agg(F.sum('units').alias('units')) > >>> df1 = df1.filter(F.col("key") != F.lit("c")) > >>> df2 = df2.filter(F.col("key") != F.lit("d")) > >>> ret = df1.join(df2, df1.key == df2.key, "full").select( > ... df1["key"].alias("df1_key"), > ... df2["key"].alias("df2_key"), > ... df1["sales"], > ... df2["units"], > ... F.coalesce(df1["key"], df2["key"]).alias("key")) > 20/10/05 15:46:14 WARN Column: Constructing trivially true equals predicate, > 'key#213 = key#213'. Perhaps you need to use aliases. > >>> ret.show() > +-------+-------+-----+-----+----+ > |df1_key|df2_key|sales|units| key| > +-------+-------+-----+-----+----+ > | d| d| 3| null| d| > | null| null| null| 2|null| > | b| b| 5| 10| b| > | a| a| 3| 6| a| > +-------+-------+-----+-----+----+>>> ret.explain() > == Physical Plan == > *(5) Project [key#213 AS df1_key#258, key#213 AS df2_key#259, sales#223L, > units#230L, coalesce(key#213, key#213) AS key#260] > +- SortMergeJoin [key#213], [key#237], FullOuter > :- *(2) Sort [key#213 ASC NULLS FIRST], false, 0 > : +- *(2) HashAggregate(keys=[key#213], functions=[sum(sales#214L)]) > : +- Exchange hashpartitioning(key#213, 200) > : +- *(1) HashAggregate(keys=[key#213], > functions=[partial_sum(sales#214L)]) > : +- *(1) Project [key#213, sales#214L] > : +- *(1) Filter (isnotnull(key#213) && NOT (key#213 = c)) > : +- Scan ExistingRDD[key#213,sales#214L,units#215L] > +- *(4) Sort [key#237 ASC NULLS FIRST], false, 0 > +- *(4) HashAggregate(keys=[key#237], functions=[sum(units#239L)]) > +- Exchange hashpartitioning(key#237, 200) > +- *(3) HashAggregate(keys=[key#237], > functions=[partial_sum(units#239L)]) > +- *(3) Project [key#237, units#239L] > +- *(3) Filter (isnotnull(key#237) && NOT (key#237 = d)) > +- Scan ExistingRDD[key#237,sales#238L,units#239L] > {code} > output for 3.0.1: > {code:java} > // code placeholder > >>> sc.version > u'3.0.1' > >>> import pyspark.sql.functions as F > >>> d = [{'key': 'a', 'sales': 1, 'units' : 2}, {'key': 'a', 'sales': 2, > >>> 'units' : 4}, {'key': 'b', 'sales': 5, 'units' : 10}, {'key': 'c', > >>> 'sales': 1, 'units' : 2}, {'key': 'd', 'sales': 3, 'units' : 6}] > >>> input_df = spark.createDataFrame(d) > /usr/local/lib/python2.7/site-packages/pyspark/sql/session.py:381: > UserWarning: inferring schema from dict is deprecated,please use > pyspark.sql.Row instead > warnings.warn("inferring schema from dict is deprecated," > >>> df1 = input_df.groupBy("key").agg(F.sum('sales').alias('sales')) > >>> df2 = input_df.groupBy("key").agg(F.sum('units').alias('units')) > >>> df1 = df1.filter(F.col("key") != F.lit("c")) > >>> df2 = df2.filter(F.col("key") != F.lit("d")) > >>> ret = df1.join(df2, df1.key == df2.key, "full").select( > ... df1["key"].alias("df1_key"), > ... df2["key"].alias("df2_key"), > ... df1["sales"], > ... df2["units"], > ... F.coalesce(df1["key"], df2["key"]).alias("key")) > >>> ret.show() > +-------+-------+-----+-----+----+ > |df1_key|df2_key|sales|units| key| > +-------+-------+-----+-----+----+ > | d| d| 3| null| d| > | null| null| null| 2|null| > | b| b| 5| 10| b| > | a| a| 3| 6| a| > +-------+-------+-----+-----+----+>>> ret.explain() > == Physical Plan == > *(5) Project [key#0 AS df1_key#45, key#0 AS df2_key#46, sales#10L, units#17L, > coalesce(key#0, key#0) AS key#47] > +- SortMergeJoin [key#0], [key#24], FullOuter > :- *(2) Sort [key#0 ASC NULLS FIRST], false, 0 > : +- *(2) HashAggregate(keys=[key#0], functions=[sum(sales#1L)]) > : +- Exchange hashpartitioning(key#0, 200), true, [id=#152] > : +- *(1) HashAggregate(keys=[key#0], > functions=[partial_sum(sales#1L)]) > : +- *(1) Project [key#0, sales#1L] > : +- *(1) Filter (isnotnull(key#0) AND NOT (key#0 = c)) > : +- *(1) Scan ExistingRDD[key#0,sales#1L,units#2L] > +- *(4) Sort [key#24 ASC NULLS FIRST], false, 0 > +- *(4) HashAggregate(keys=[key#24], functions=[sum(units#26L)]) > +- Exchange hashpartitioning(key#24, 200), true, [id=#158] > +- *(3) HashAggregate(keys=[key#24], > functions=[partial_sum(units#26L)]) > +- *(3) Project [key#24, units#26L] > +- *(3) Filter (isnotnull(key#24) AND NOT (key#24 = d)) > +- *(3) Scan > ExistingRDD[key#24,sales#25L,units#26L]{code} > key#0 is the reference used for both alias operations and both sides of the > coalesce, despite the query plan projecting key#24 for the right side of the > join. > Concretely, I believe the output of the join should be this: > {code:java} > +-------+-------+-----+-----+----+ > |df1_key|df2_key|sales|units| key| > +-------+-------+-----+-----+----+ > | d| null| 3| null| d| > | null| c| null| 2| c| > | b| b| 5| 10| b| > | a| a| 3| 6| a| > +-------+-------+-----+-----+----+ > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org