[ 
https://issues.apache.org/jira/browse/SPARK-33071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17209050#comment-17209050
 ] 

George commented on SPARK-33071:
--------------------------------

[~EveLiao] Yeah that sounds correct, another workaround I found was something 
like
{code:java}
df1_cols = list(map(lambda x: x + "_1", df1.columns))
df1 = toDF(df1_cols){code}
 

In my mind the reason why I view this as a bug is that it takes a fairly deep 
understanding of how spark handles columns, copying and references to expect 
this behavior. For many developers, this just wouldn't be the behavior they 
expect to see.

> Join with ambiguous column succeeding but giving wrong output
> -------------------------------------------------------------
>
>                 Key: SPARK-33071
>                 URL: https://issues.apache.org/jira/browse/SPARK-33071
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.4, 3.0.1
>            Reporter: George
>            Priority: Major
>              Labels: correctness
>
> When joining two datasets where one column in each dataset is sourced from 
> the same input dataset, the join successfully runs, but does not select the 
> correct columns, leading to incorrect output.
> Repro using pyspark:
> {code:java}
> sc.version
> import pyspark.sql.functions as F
> d = [{'key': 'a', 'sales': 1, 'units' : 2}, {'key': 'a', 'sales': 2, 'units' 
> : 4}, {'key': 'b', 'sales': 5, 'units' : 10}, {'key': 'c', 'sales': 1, 
> 'units' : 2}, {'key': 'd', 'sales': 3, 'units' : 6}]
> input_df = spark.createDataFrame(d)
> df1 = input_df.groupBy("key").agg(F.sum('sales').alias('sales'))
> df2 = input_df.groupBy("key").agg(F.sum('units').alias('units'))
> df1 = df1.filter(F.col("key") != F.lit("c"))
> df2 = df2.filter(F.col("key") != F.lit("d"))
> ret = df1.join(df2, df1.key == df2.key, "full").select(
> df1["key"].alias("df1_key"),
> df2["key"].alias("df2_key"),
> df1["sales"],
> df2["units"],
> F.coalesce(df1["key"], df2["key"]).alias("key"))
> ret.show()
> ret.explain(){code}
> output for 2.4.4:
> {code:java}
> >>> sc.version
> u'2.4.4'
> >>> import pyspark.sql.functions as F
> >>> d = [{'key': 'a', 'sales': 1, 'units' : 2}, {'key': 'a', 'sales': 2, 
> >>> 'units' : 4}, {'key': 'b', 'sales': 5, 'units' : 10}, {'key': 'c', 
> >>> 'sales': 1, 'units' : 2}, {'key': 'd', 'sales': 3, 'units' : 6}]
> >>> input_df = spark.createDataFrame(d)
> >>> df1 = input_df.groupBy("key").agg(F.sum('sales').alias('sales'))
> >>> df2 = input_df.groupBy("key").agg(F.sum('units').alias('units'))
> >>> df1 = df1.filter(F.col("key") != F.lit("c"))
> >>> df2 = df2.filter(F.col("key") != F.lit("d"))
> >>> ret = df1.join(df2, df1.key == df2.key, "full").select(
> ... df1["key"].alias("df1_key"),
> ... df2["key"].alias("df2_key"),
> ... df1["sales"],
> ... df2["units"],
> ... F.coalesce(df1["key"], df2["key"]).alias("key"))
> 20/10/05 15:46:14 WARN Column: Constructing trivially true equals predicate, 
> 'key#213 = key#213'. Perhaps you need to use aliases.
> >>> ret.show()
> +-------+-------+-----+-----+----+
> |df1_key|df2_key|sales|units| key|
> +-------+-------+-----+-----+----+
> |      d|      d|    3| null|   d|
> |   null|   null| null|    2|null|
> |      b|      b|    5|   10|   b|
> |      a|      a|    3|    6|   a|
> +-------+-------+-----+-----+----+>>> ret.explain()
> == Physical Plan ==
> *(5) Project [key#213 AS df1_key#258, key#213 AS df2_key#259, sales#223L, 
> units#230L, coalesce(key#213, key#213) AS key#260]
> +- SortMergeJoin [key#213], [key#237], FullOuter
>    :- *(2) Sort [key#213 ASC NULLS FIRST], false, 0
>    :  +- *(2) HashAggregate(keys=[key#213], functions=[sum(sales#214L)])
>    :     +- Exchange hashpartitioning(key#213, 200)
>    :        +- *(1) HashAggregate(keys=[key#213], 
> functions=[partial_sum(sales#214L)])
>    :           +- *(1) Project [key#213, sales#214L]
>    :              +- *(1) Filter (isnotnull(key#213) && NOT (key#213 = c))
>    :                 +- Scan ExistingRDD[key#213,sales#214L,units#215L]
>    +- *(4) Sort [key#237 ASC NULLS FIRST], false, 0
>       +- *(4) HashAggregate(keys=[key#237], functions=[sum(units#239L)])
>          +- Exchange hashpartitioning(key#237, 200)
>             +- *(3) HashAggregate(keys=[key#237], 
> functions=[partial_sum(units#239L)])
>                +- *(3) Project [key#237, units#239L]
>                   +- *(3) Filter (isnotnull(key#237) && NOT (key#237 = d))
>                      +- Scan ExistingRDD[key#237,sales#238L,units#239L]
> {code}
> output for 3.0.1:
> {code:java}
> // code placeholder
> >>> sc.version
> u'3.0.1'
> >>> import pyspark.sql.functions as F
> >>> d = [{'key': 'a', 'sales': 1, 'units' : 2}, {'key': 'a', 'sales': 2, 
> >>> 'units' : 4}, {'key': 'b', 'sales': 5, 'units' : 10}, {'key': 'c', 
> >>> 'sales': 1, 'units' : 2}, {'key': 'd', 'sales': 3, 'units' : 6}]
> >>> input_df = spark.createDataFrame(d)
> /usr/local/lib/python2.7/site-packages/pyspark/sql/session.py:381: 
> UserWarning: inferring schema from dict is deprecated,please use 
> pyspark.sql.Row instead
>   warnings.warn("inferring schema from dict is deprecated,"
> >>> df1 = input_df.groupBy("key").agg(F.sum('sales').alias('sales'))
> >>> df2 = input_df.groupBy("key").agg(F.sum('units').alias('units'))
> >>> df1 = df1.filter(F.col("key") != F.lit("c"))
> >>> df2 = df2.filter(F.col("key") != F.lit("d"))
> >>> ret = df1.join(df2, df1.key == df2.key, "full").select(
> ... df1["key"].alias("df1_key"),
> ... df2["key"].alias("df2_key"),
> ... df1["sales"],
> ... df2["units"],
> ... F.coalesce(df1["key"], df2["key"]).alias("key"))
> >>> ret.show()
> +-------+-------+-----+-----+----+
> |df1_key|df2_key|sales|units| key|
> +-------+-------+-----+-----+----+
> |      d|      d|    3| null|   d|
> |   null|   null| null|    2|null|
> |      b|      b|    5|   10|   b|
> |      a|      a|    3|    6|   a|
> +-------+-------+-----+-----+----+>>> ret.explain()
> == Physical Plan ==
> *(5) Project [key#0 AS df1_key#45, key#0 AS df2_key#46, sales#10L, units#17L, 
> coalesce(key#0, key#0) AS key#47]
> +- SortMergeJoin [key#0], [key#24], FullOuter
>    :- *(2) Sort [key#0 ASC NULLS FIRST], false, 0
>    :  +- *(2) HashAggregate(keys=[key#0], functions=[sum(sales#1L)])
>    :     +- Exchange hashpartitioning(key#0, 200), true, [id=#152]
>    :        +- *(1) HashAggregate(keys=[key#0], 
> functions=[partial_sum(sales#1L)])
>    :           +- *(1) Project [key#0, sales#1L]
>    :              +- *(1) Filter (isnotnull(key#0) AND NOT (key#0 = c))
>    :                 +- *(1) Scan ExistingRDD[key#0,sales#1L,units#2L]
>    +- *(4) Sort [key#24 ASC NULLS FIRST], false, 0
>       +- *(4) HashAggregate(keys=[key#24], functions=[sum(units#26L)])
>          +- Exchange hashpartitioning(key#24, 200), true, [id=#158]
>             +- *(3) HashAggregate(keys=[key#24], 
> functions=[partial_sum(units#26L)])
>                +- *(3) Project [key#24, units#26L]
>                   +- *(3) Filter (isnotnull(key#24) AND NOT (key#24 = d))
>                      +- *(3) Scan 
> ExistingRDD[key#24,sales#25L,units#26L]{code}
> key#0 is the reference used for both alias operations and both sides of the 
> coalesce, despite the query plan projecting key#24 for the right side of the 
> join.
> Concretely, I believe the output of the join should be this:
> {code:java}
> +-------+-------+-----+-----+----+
> |df1_key|df2_key|sales|units| key|
> +-------+-------+-----+-----+----+
> |      d|   null|    3| null|   d|
> |   null|      c| null|    2|   c|
> |      b|      b|    5|   10|   b|
> |      a|      a|    3|    6|   a|
> +-------+-------+-----+-----+----+
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to