[jira] [Assigned] (SPARK-33071) Join with ambiguous column succeeding but giving wrong output

2020-12-02 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-33071:
---

Assignee: wuyi

> Join with ambiguous column succeeding but giving wrong output
> -
>
> Key: SPARK-33071
> URL: https://issues.apache.org/jira/browse/SPARK-33071
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.4, 3.0.1, 3.1.0
>Reporter: George
>Assignee: wuyi
>Priority: Critical
>  Labels: correctness
> Fix For: 3.1.0
>
>
> When joining two datasets where one column in each dataset is sourced from 
> the same input dataset, the join successfully runs, but does not select the 
> correct columns, leading to incorrect output.
> Repro using pyspark:
> {code:java}
> sc.version
> import pyspark.sql.functions as F
> d = [{'key': 'a', 'sales': 1, 'units' : 2}, {'key': 'a', 'sales': 2, 'units' 
> : 4}, {'key': 'b', 'sales': 5, 'units' : 10}, {'key': 'c', 'sales': 1, 
> 'units' : 2}, {'key': 'd', 'sales': 3, 'units' : 6}]
> input_df = spark.createDataFrame(d)
> df1 = input_df.groupBy("key").agg(F.sum('sales').alias('sales'))
> df2 = input_df.groupBy("key").agg(F.sum('units').alias('units'))
> df1 = df1.filter(F.col("key") != F.lit("c"))
> df2 = df2.filter(F.col("key") != F.lit("d"))
> ret = df1.join(df2, df1.key == df2.key, "full").select(
> df1["key"].alias("df1_key"),
> df2["key"].alias("df2_key"),
> df1["sales"],
> df2["units"],
> F.coalesce(df1["key"], df2["key"]).alias("key"))
> ret.show()
> ret.explain(){code}
> output for 2.4.4:
> {code:java}
> >>> sc.version
> u'2.4.4'
> >>> import pyspark.sql.functions as F
> >>> d = [{'key': 'a', 'sales': 1, 'units' : 2}, {'key': 'a', 'sales': 2, 
> >>> 'units' : 4}, {'key': 'b', 'sales': 5, 'units' : 10}, {'key': 'c', 
> >>> 'sales': 1, 'units' : 2}, {'key': 'd', 'sales': 3, 'units' : 6}]
> >>> input_df = spark.createDataFrame(d)
> >>> df1 = input_df.groupBy("key").agg(F.sum('sales').alias('sales'))
> >>> df2 = input_df.groupBy("key").agg(F.sum('units').alias('units'))
> >>> df1 = df1.filter(F.col("key") != F.lit("c"))
> >>> df2 = df2.filter(F.col("key") != F.lit("d"))
> >>> ret = df1.join(df2, df1.key == df2.key, "full").select(
> ... df1["key"].alias("df1_key"),
> ... df2["key"].alias("df2_key"),
> ... df1["sales"],
> ... df2["units"],
> ... F.coalesce(df1["key"], df2["key"]).alias("key"))
> 20/10/05 15:46:14 WARN Column: Constructing trivially true equals predicate, 
> 'key#213 = key#213'. Perhaps you need to use aliases.
> >>> ret.show()
> +---+---+-+-++
> |df1_key|df2_key|sales|units| key|
> +---+---+-+-++
> |  d|  d|3| null|   d|
> |   null|   null| null|2|null|
> |  b|  b|5|   10|   b|
> |  a|  a|3|6|   a|
> +---+---+-+-++>>> ret.explain()
> == Physical Plan ==
> *(5) Project [key#213 AS df1_key#258, key#213 AS df2_key#259, sales#223L, 
> units#230L, coalesce(key#213, key#213) AS key#260]
> +- SortMergeJoin [key#213], [key#237], FullOuter
>:- *(2) Sort [key#213 ASC NULLS FIRST], false, 0
>:  +- *(2) HashAggregate(keys=[key#213], functions=[sum(sales#214L)])
>: +- Exchange hashpartitioning(key#213, 200)
>:+- *(1) HashAggregate(keys=[key#213], 
> functions=[partial_sum(sales#214L)])
>:   +- *(1) Project [key#213, sales#214L]
>:  +- *(1) Filter (isnotnull(key#213) && NOT (key#213 = c))
>: +- Scan ExistingRDD[key#213,sales#214L,units#215L]
>+- *(4) Sort [key#237 ASC NULLS FIRST], false, 0
>   +- *(4) HashAggregate(keys=[key#237], functions=[sum(units#239L)])
>  +- Exchange hashpartitioning(key#237, 200)
> +- *(3) HashAggregate(keys=[key#237], 
> functions=[partial_sum(units#239L)])
>+- *(3) Project [key#237, units#239L]
>   +- *(3) Filter (isnotnull(key#237) && NOT (key#237 = d))
>  +- Scan ExistingRDD[key#237,sales#238L,units#239L]
> {code}
> output for 3.0.1:
> {code:java}
> // code placeholder
> >>> sc.version
> u'3.0.1'
> >>> import pyspark.sql.functions as F
> >>> d = [{'key': 'a', 'sales': 1, 'units' : 2}, {'key': 'a', 'sales': 2, 
> >>> 'units' : 4}, {'key': 'b', 'sales': 5, 'units' : 10}, {'key': 'c', 
> >>> 'sales': 1, 'units' : 2}, {'key': 'd', 'sales': 3, 'units' : 6}]
> >>> input_df = spark.createDataFrame(d)
> /usr/local/lib/python2.7/site-packages/pyspark/sql/session.py:381: 
> UserWarning: inferring schema from dict is deprecated,please use 
> pyspark.sql.Row instead
>   warnings.warn("inferring schema from dict is deprecated,"
> >>> df1 = input_df.groupBy("key").agg(F.sum('sales').alias('sales'))
> >>> df2 = input_df.groupBy("key").agg(F.sum('units').alias('units'))
> >>> df1 = df1.fil

[jira] [Assigned] (SPARK-33071) Join with ambiguous column succeeding but giving wrong output

2020-11-24 Thread Apache Spark (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-33071:


Assignee: Apache Spark

> Join with ambiguous column succeeding but giving wrong output
> -
>
> Key: SPARK-33071
> URL: https://issues.apache.org/jira/browse/SPARK-33071
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.4, 3.0.1, 3.1.0
>Reporter: George
>Assignee: Apache Spark
>Priority: Critical
>  Labels: correctness
>
> When joining two datasets where one column in each dataset is sourced from 
> the same input dataset, the join successfully runs, but does not select the 
> correct columns, leading to incorrect output.
> Repro using pyspark:
> {code:java}
> sc.version
> import pyspark.sql.functions as F
> d = [{'key': 'a', 'sales': 1, 'units' : 2}, {'key': 'a', 'sales': 2, 'units' 
> : 4}, {'key': 'b', 'sales': 5, 'units' : 10}, {'key': 'c', 'sales': 1, 
> 'units' : 2}, {'key': 'd', 'sales': 3, 'units' : 6}]
> input_df = spark.createDataFrame(d)
> df1 = input_df.groupBy("key").agg(F.sum('sales').alias('sales'))
> df2 = input_df.groupBy("key").agg(F.sum('units').alias('units'))
> df1 = df1.filter(F.col("key") != F.lit("c"))
> df2 = df2.filter(F.col("key") != F.lit("d"))
> ret = df1.join(df2, df1.key == df2.key, "full").select(
> df1["key"].alias("df1_key"),
> df2["key"].alias("df2_key"),
> df1["sales"],
> df2["units"],
> F.coalesce(df1["key"], df2["key"]).alias("key"))
> ret.show()
> ret.explain(){code}
> output for 2.4.4:
> {code:java}
> >>> sc.version
> u'2.4.4'
> >>> import pyspark.sql.functions as F
> >>> d = [{'key': 'a', 'sales': 1, 'units' : 2}, {'key': 'a', 'sales': 2, 
> >>> 'units' : 4}, {'key': 'b', 'sales': 5, 'units' : 10}, {'key': 'c', 
> >>> 'sales': 1, 'units' : 2}, {'key': 'd', 'sales': 3, 'units' : 6}]
> >>> input_df = spark.createDataFrame(d)
> >>> df1 = input_df.groupBy("key").agg(F.sum('sales').alias('sales'))
> >>> df2 = input_df.groupBy("key").agg(F.sum('units').alias('units'))
> >>> df1 = df1.filter(F.col("key") != F.lit("c"))
> >>> df2 = df2.filter(F.col("key") != F.lit("d"))
> >>> ret = df1.join(df2, df1.key == df2.key, "full").select(
> ... df1["key"].alias("df1_key"),
> ... df2["key"].alias("df2_key"),
> ... df1["sales"],
> ... df2["units"],
> ... F.coalesce(df1["key"], df2["key"]).alias("key"))
> 20/10/05 15:46:14 WARN Column: Constructing trivially true equals predicate, 
> 'key#213 = key#213'. Perhaps you need to use aliases.
> >>> ret.show()
> +---+---+-+-++
> |df1_key|df2_key|sales|units| key|
> +---+---+-+-++
> |  d|  d|3| null|   d|
> |   null|   null| null|2|null|
> |  b|  b|5|   10|   b|
> |  a|  a|3|6|   a|
> +---+---+-+-++>>> ret.explain()
> == Physical Plan ==
> *(5) Project [key#213 AS df1_key#258, key#213 AS df2_key#259, sales#223L, 
> units#230L, coalesce(key#213, key#213) AS key#260]
> +- SortMergeJoin [key#213], [key#237], FullOuter
>:- *(2) Sort [key#213 ASC NULLS FIRST], false, 0
>:  +- *(2) HashAggregate(keys=[key#213], functions=[sum(sales#214L)])
>: +- Exchange hashpartitioning(key#213, 200)
>:+- *(1) HashAggregate(keys=[key#213], 
> functions=[partial_sum(sales#214L)])
>:   +- *(1) Project [key#213, sales#214L]
>:  +- *(1) Filter (isnotnull(key#213) && NOT (key#213 = c))
>: +- Scan ExistingRDD[key#213,sales#214L,units#215L]
>+- *(4) Sort [key#237 ASC NULLS FIRST], false, 0
>   +- *(4) HashAggregate(keys=[key#237], functions=[sum(units#239L)])
>  +- Exchange hashpartitioning(key#237, 200)
> +- *(3) HashAggregate(keys=[key#237], 
> functions=[partial_sum(units#239L)])
>+- *(3) Project [key#237, units#239L]
>   +- *(3) Filter (isnotnull(key#237) && NOT (key#237 = d))
>  +- Scan ExistingRDD[key#237,sales#238L,units#239L]
> {code}
> output for 3.0.1:
> {code:java}
> // code placeholder
> >>> sc.version
> u'3.0.1'
> >>> import pyspark.sql.functions as F
> >>> d = [{'key': 'a', 'sales': 1, 'units' : 2}, {'key': 'a', 'sales': 2, 
> >>> 'units' : 4}, {'key': 'b', 'sales': 5, 'units' : 10}, {'key': 'c', 
> >>> 'sales': 1, 'units' : 2}, {'key': 'd', 'sales': 3, 'units' : 6}]
> >>> input_df = spark.createDataFrame(d)
> /usr/local/lib/python2.7/site-packages/pyspark/sql/session.py:381: 
> UserWarning: inferring schema from dict is deprecated,please use 
> pyspark.sql.Row instead
>   warnings.warn("inferring schema from dict is deprecated,"
> >>> df1 = input_df.groupBy("key").agg(F.sum('sales').alias('sales'))
> >>> df2 = input_df.groupBy("key").agg(F.sum('units').alias('units'))
> >>> df1 = df1.filter(F.col("ke

[jira] [Assigned] (SPARK-33071) Join with ambiguous column succeeding but giving wrong output

2020-11-24 Thread Apache Spark (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-33071:


Assignee: (was: Apache Spark)

> Join with ambiguous column succeeding but giving wrong output
> -
>
> Key: SPARK-33071
> URL: https://issues.apache.org/jira/browse/SPARK-33071
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.4, 3.0.1, 3.1.0
>Reporter: George
>Priority: Critical
>  Labels: correctness
>
> When joining two datasets where one column in each dataset is sourced from 
> the same input dataset, the join successfully runs, but does not select the 
> correct columns, leading to incorrect output.
> Repro using pyspark:
> {code:java}
> sc.version
> import pyspark.sql.functions as F
> d = [{'key': 'a', 'sales': 1, 'units' : 2}, {'key': 'a', 'sales': 2, 'units' 
> : 4}, {'key': 'b', 'sales': 5, 'units' : 10}, {'key': 'c', 'sales': 1, 
> 'units' : 2}, {'key': 'd', 'sales': 3, 'units' : 6}]
> input_df = spark.createDataFrame(d)
> df1 = input_df.groupBy("key").agg(F.sum('sales').alias('sales'))
> df2 = input_df.groupBy("key").agg(F.sum('units').alias('units'))
> df1 = df1.filter(F.col("key") != F.lit("c"))
> df2 = df2.filter(F.col("key") != F.lit("d"))
> ret = df1.join(df2, df1.key == df2.key, "full").select(
> df1["key"].alias("df1_key"),
> df2["key"].alias("df2_key"),
> df1["sales"],
> df2["units"],
> F.coalesce(df1["key"], df2["key"]).alias("key"))
> ret.show()
> ret.explain(){code}
> output for 2.4.4:
> {code:java}
> >>> sc.version
> u'2.4.4'
> >>> import pyspark.sql.functions as F
> >>> d = [{'key': 'a', 'sales': 1, 'units' : 2}, {'key': 'a', 'sales': 2, 
> >>> 'units' : 4}, {'key': 'b', 'sales': 5, 'units' : 10}, {'key': 'c', 
> >>> 'sales': 1, 'units' : 2}, {'key': 'd', 'sales': 3, 'units' : 6}]
> >>> input_df = spark.createDataFrame(d)
> >>> df1 = input_df.groupBy("key").agg(F.sum('sales').alias('sales'))
> >>> df2 = input_df.groupBy("key").agg(F.sum('units').alias('units'))
> >>> df1 = df1.filter(F.col("key") != F.lit("c"))
> >>> df2 = df2.filter(F.col("key") != F.lit("d"))
> >>> ret = df1.join(df2, df1.key == df2.key, "full").select(
> ... df1["key"].alias("df1_key"),
> ... df2["key"].alias("df2_key"),
> ... df1["sales"],
> ... df2["units"],
> ... F.coalesce(df1["key"], df2["key"]).alias("key"))
> 20/10/05 15:46:14 WARN Column: Constructing trivially true equals predicate, 
> 'key#213 = key#213'. Perhaps you need to use aliases.
> >>> ret.show()
> +---+---+-+-++
> |df1_key|df2_key|sales|units| key|
> +---+---+-+-++
> |  d|  d|3| null|   d|
> |   null|   null| null|2|null|
> |  b|  b|5|   10|   b|
> |  a|  a|3|6|   a|
> +---+---+-+-++>>> ret.explain()
> == Physical Plan ==
> *(5) Project [key#213 AS df1_key#258, key#213 AS df2_key#259, sales#223L, 
> units#230L, coalesce(key#213, key#213) AS key#260]
> +- SortMergeJoin [key#213], [key#237], FullOuter
>:- *(2) Sort [key#213 ASC NULLS FIRST], false, 0
>:  +- *(2) HashAggregate(keys=[key#213], functions=[sum(sales#214L)])
>: +- Exchange hashpartitioning(key#213, 200)
>:+- *(1) HashAggregate(keys=[key#213], 
> functions=[partial_sum(sales#214L)])
>:   +- *(1) Project [key#213, sales#214L]
>:  +- *(1) Filter (isnotnull(key#213) && NOT (key#213 = c))
>: +- Scan ExistingRDD[key#213,sales#214L,units#215L]
>+- *(4) Sort [key#237 ASC NULLS FIRST], false, 0
>   +- *(4) HashAggregate(keys=[key#237], functions=[sum(units#239L)])
>  +- Exchange hashpartitioning(key#237, 200)
> +- *(3) HashAggregate(keys=[key#237], 
> functions=[partial_sum(units#239L)])
>+- *(3) Project [key#237, units#239L]
>   +- *(3) Filter (isnotnull(key#237) && NOT (key#237 = d))
>  +- Scan ExistingRDD[key#237,sales#238L,units#239L]
> {code}
> output for 3.0.1:
> {code:java}
> // code placeholder
> >>> sc.version
> u'3.0.1'
> >>> import pyspark.sql.functions as F
> >>> d = [{'key': 'a', 'sales': 1, 'units' : 2}, {'key': 'a', 'sales': 2, 
> >>> 'units' : 4}, {'key': 'b', 'sales': 5, 'units' : 10}, {'key': 'c', 
> >>> 'sales': 1, 'units' : 2}, {'key': 'd', 'sales': 3, 'units' : 6}]
> >>> input_df = spark.createDataFrame(d)
> /usr/local/lib/python2.7/site-packages/pyspark/sql/session.py:381: 
> UserWarning: inferring schema from dict is deprecated,please use 
> pyspark.sql.Row instead
>   warnings.warn("inferring schema from dict is deprecated,"
> >>> df1 = input_df.groupBy("key").agg(F.sum('sales').alias('sales'))
> >>> df2 = input_df.groupBy("key").agg(F.sum('units').alias('units'))
> >>> df1 = df1.filter(F.col("key") != F.lit("c"))
> >>>