[jira] [Assigned] (SPARK-33071) Join with ambiguous column succeeding but giving wrong output
[ https://issues.apache.org/jira/browse/SPARK-33071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan reassigned SPARK-33071: --- Assignee: wuyi > Join with ambiguous column succeeding but giving wrong output > - > > Key: SPARK-33071 > URL: https://issues.apache.org/jira/browse/SPARK-33071 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.4, 3.0.1, 3.1.0 >Reporter: George >Assignee: wuyi >Priority: Critical > Labels: correctness > Fix For: 3.1.0 > > > When joining two datasets where one column in each dataset is sourced from > the same input dataset, the join successfully runs, but does not select the > correct columns, leading to incorrect output. > Repro using pyspark: > {code:java} > sc.version > import pyspark.sql.functions as F > d = [{'key': 'a', 'sales': 1, 'units' : 2}, {'key': 'a', 'sales': 2, 'units' > : 4}, {'key': 'b', 'sales': 5, 'units' : 10}, {'key': 'c', 'sales': 1, > 'units' : 2}, {'key': 'd', 'sales': 3, 'units' : 6}] > input_df = spark.createDataFrame(d) > df1 = input_df.groupBy("key").agg(F.sum('sales').alias('sales')) > df2 = input_df.groupBy("key").agg(F.sum('units').alias('units')) > df1 = df1.filter(F.col("key") != F.lit("c")) > df2 = df2.filter(F.col("key") != F.lit("d")) > ret = df1.join(df2, df1.key == df2.key, "full").select( > df1["key"].alias("df1_key"), > df2["key"].alias("df2_key"), > df1["sales"], > df2["units"], > F.coalesce(df1["key"], df2["key"]).alias("key")) > ret.show() > ret.explain(){code} > output for 2.4.4: > {code:java} > >>> sc.version > u'2.4.4' > >>> import pyspark.sql.functions as F > >>> d = [{'key': 'a', 'sales': 1, 'units' : 2}, {'key': 'a', 'sales': 2, > >>> 'units' : 4}, {'key': 'b', 'sales': 5, 'units' : 10}, {'key': 'c', > >>> 'sales': 1, 'units' : 2}, {'key': 'd', 'sales': 3, 'units' : 6}] > >>> input_df = spark.createDataFrame(d) > >>> df1 = input_df.groupBy("key").agg(F.sum('sales').alias('sales')) > >>> df2 = input_df.groupBy("key").agg(F.sum('units').alias('units')) > >>> df1 = df1.filter(F.col("key") != F.lit("c")) > >>> df2 = df2.filter(F.col("key") != F.lit("d")) > >>> ret = df1.join(df2, df1.key == df2.key, "full").select( > ... df1["key"].alias("df1_key"), > ... df2["key"].alias("df2_key"), > ... df1["sales"], > ... df2["units"], > ... F.coalesce(df1["key"], df2["key"]).alias("key")) > 20/10/05 15:46:14 WARN Column: Constructing trivially true equals predicate, > 'key#213 = key#213'. Perhaps you need to use aliases. > >>> ret.show() > +---+---+-+-++ > |df1_key|df2_key|sales|units| key| > +---+---+-+-++ > | d| d|3| null| d| > | null| null| null|2|null| > | b| b|5| 10| b| > | a| a|3|6| a| > +---+---+-+-++>>> ret.explain() > == Physical Plan == > *(5) Project [key#213 AS df1_key#258, key#213 AS df2_key#259, sales#223L, > units#230L, coalesce(key#213, key#213) AS key#260] > +- SortMergeJoin [key#213], [key#237], FullOuter >:- *(2) Sort [key#213 ASC NULLS FIRST], false, 0 >: +- *(2) HashAggregate(keys=[key#213], functions=[sum(sales#214L)]) >: +- Exchange hashpartitioning(key#213, 200) >:+- *(1) HashAggregate(keys=[key#213], > functions=[partial_sum(sales#214L)]) >: +- *(1) Project [key#213, sales#214L] >: +- *(1) Filter (isnotnull(key#213) && NOT (key#213 = c)) >: +- Scan ExistingRDD[key#213,sales#214L,units#215L] >+- *(4) Sort [key#237 ASC NULLS FIRST], false, 0 > +- *(4) HashAggregate(keys=[key#237], functions=[sum(units#239L)]) > +- Exchange hashpartitioning(key#237, 200) > +- *(3) HashAggregate(keys=[key#237], > functions=[partial_sum(units#239L)]) >+- *(3) Project [key#237, units#239L] > +- *(3) Filter (isnotnull(key#237) && NOT (key#237 = d)) > +- Scan ExistingRDD[key#237,sales#238L,units#239L] > {code} > output for 3.0.1: > {code:java} > // code placeholder > >>> sc.version > u'3.0.1' > >>> import pyspark.sql.functions as F > >>> d = [{'key': 'a', 'sales': 1, 'units' : 2}, {'key': 'a', 'sales': 2, > >>> 'units' : 4}, {'key': 'b', 'sales': 5, 'units' : 10}, {'key': 'c', > >>> 'sales': 1, 'units' : 2}, {'key': 'd', 'sales': 3, 'units' : 6}] > >>> input_df = spark.createDataFrame(d) > /usr/local/lib/python2.7/site-packages/pyspark/sql/session.py:381: > UserWarning: inferring schema from dict is deprecated,please use > pyspark.sql.Row instead > warnings.warn("inferring schema from dict is deprecated," > >>> df1 = input_df.groupBy("key").agg(F.sum('sales').alias('sales')) > >>> df2 = input_df.groupBy("key").agg(F.sum('units').alias('units')) > >>> df1 = df1.fil
[jira] [Assigned] (SPARK-33071) Join with ambiguous column succeeding but giving wrong output
[ https://issues.apache.org/jira/browse/SPARK-33071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-33071: Assignee: Apache Spark > Join with ambiguous column succeeding but giving wrong output > - > > Key: SPARK-33071 > URL: https://issues.apache.org/jira/browse/SPARK-33071 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.4, 3.0.1, 3.1.0 >Reporter: George >Assignee: Apache Spark >Priority: Critical > Labels: correctness > > When joining two datasets where one column in each dataset is sourced from > the same input dataset, the join successfully runs, but does not select the > correct columns, leading to incorrect output. > Repro using pyspark: > {code:java} > sc.version > import pyspark.sql.functions as F > d = [{'key': 'a', 'sales': 1, 'units' : 2}, {'key': 'a', 'sales': 2, 'units' > : 4}, {'key': 'b', 'sales': 5, 'units' : 10}, {'key': 'c', 'sales': 1, > 'units' : 2}, {'key': 'd', 'sales': 3, 'units' : 6}] > input_df = spark.createDataFrame(d) > df1 = input_df.groupBy("key").agg(F.sum('sales').alias('sales')) > df2 = input_df.groupBy("key").agg(F.sum('units').alias('units')) > df1 = df1.filter(F.col("key") != F.lit("c")) > df2 = df2.filter(F.col("key") != F.lit("d")) > ret = df1.join(df2, df1.key == df2.key, "full").select( > df1["key"].alias("df1_key"), > df2["key"].alias("df2_key"), > df1["sales"], > df2["units"], > F.coalesce(df1["key"], df2["key"]).alias("key")) > ret.show() > ret.explain(){code} > output for 2.4.4: > {code:java} > >>> sc.version > u'2.4.4' > >>> import pyspark.sql.functions as F > >>> d = [{'key': 'a', 'sales': 1, 'units' : 2}, {'key': 'a', 'sales': 2, > >>> 'units' : 4}, {'key': 'b', 'sales': 5, 'units' : 10}, {'key': 'c', > >>> 'sales': 1, 'units' : 2}, {'key': 'd', 'sales': 3, 'units' : 6}] > >>> input_df = spark.createDataFrame(d) > >>> df1 = input_df.groupBy("key").agg(F.sum('sales').alias('sales')) > >>> df2 = input_df.groupBy("key").agg(F.sum('units').alias('units')) > >>> df1 = df1.filter(F.col("key") != F.lit("c")) > >>> df2 = df2.filter(F.col("key") != F.lit("d")) > >>> ret = df1.join(df2, df1.key == df2.key, "full").select( > ... df1["key"].alias("df1_key"), > ... df2["key"].alias("df2_key"), > ... df1["sales"], > ... df2["units"], > ... F.coalesce(df1["key"], df2["key"]).alias("key")) > 20/10/05 15:46:14 WARN Column: Constructing trivially true equals predicate, > 'key#213 = key#213'. Perhaps you need to use aliases. > >>> ret.show() > +---+---+-+-++ > |df1_key|df2_key|sales|units| key| > +---+---+-+-++ > | d| d|3| null| d| > | null| null| null|2|null| > | b| b|5| 10| b| > | a| a|3|6| a| > +---+---+-+-++>>> ret.explain() > == Physical Plan == > *(5) Project [key#213 AS df1_key#258, key#213 AS df2_key#259, sales#223L, > units#230L, coalesce(key#213, key#213) AS key#260] > +- SortMergeJoin [key#213], [key#237], FullOuter >:- *(2) Sort [key#213 ASC NULLS FIRST], false, 0 >: +- *(2) HashAggregate(keys=[key#213], functions=[sum(sales#214L)]) >: +- Exchange hashpartitioning(key#213, 200) >:+- *(1) HashAggregate(keys=[key#213], > functions=[partial_sum(sales#214L)]) >: +- *(1) Project [key#213, sales#214L] >: +- *(1) Filter (isnotnull(key#213) && NOT (key#213 = c)) >: +- Scan ExistingRDD[key#213,sales#214L,units#215L] >+- *(4) Sort [key#237 ASC NULLS FIRST], false, 0 > +- *(4) HashAggregate(keys=[key#237], functions=[sum(units#239L)]) > +- Exchange hashpartitioning(key#237, 200) > +- *(3) HashAggregate(keys=[key#237], > functions=[partial_sum(units#239L)]) >+- *(3) Project [key#237, units#239L] > +- *(3) Filter (isnotnull(key#237) && NOT (key#237 = d)) > +- Scan ExistingRDD[key#237,sales#238L,units#239L] > {code} > output for 3.0.1: > {code:java} > // code placeholder > >>> sc.version > u'3.0.1' > >>> import pyspark.sql.functions as F > >>> d = [{'key': 'a', 'sales': 1, 'units' : 2}, {'key': 'a', 'sales': 2, > >>> 'units' : 4}, {'key': 'b', 'sales': 5, 'units' : 10}, {'key': 'c', > >>> 'sales': 1, 'units' : 2}, {'key': 'd', 'sales': 3, 'units' : 6}] > >>> input_df = spark.createDataFrame(d) > /usr/local/lib/python2.7/site-packages/pyspark/sql/session.py:381: > UserWarning: inferring schema from dict is deprecated,please use > pyspark.sql.Row instead > warnings.warn("inferring schema from dict is deprecated," > >>> df1 = input_df.groupBy("key").agg(F.sum('sales').alias('sales')) > >>> df2 = input_df.groupBy("key").agg(F.sum('units').alias('units')) > >>> df1 = df1.filter(F.col("ke
[jira] [Assigned] (SPARK-33071) Join with ambiguous column succeeding but giving wrong output
[ https://issues.apache.org/jira/browse/SPARK-33071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-33071: Assignee: (was: Apache Spark) > Join with ambiguous column succeeding but giving wrong output > - > > Key: SPARK-33071 > URL: https://issues.apache.org/jira/browse/SPARK-33071 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.4, 3.0.1, 3.1.0 >Reporter: George >Priority: Critical > Labels: correctness > > When joining two datasets where one column in each dataset is sourced from > the same input dataset, the join successfully runs, but does not select the > correct columns, leading to incorrect output. > Repro using pyspark: > {code:java} > sc.version > import pyspark.sql.functions as F > d = [{'key': 'a', 'sales': 1, 'units' : 2}, {'key': 'a', 'sales': 2, 'units' > : 4}, {'key': 'b', 'sales': 5, 'units' : 10}, {'key': 'c', 'sales': 1, > 'units' : 2}, {'key': 'd', 'sales': 3, 'units' : 6}] > input_df = spark.createDataFrame(d) > df1 = input_df.groupBy("key").agg(F.sum('sales').alias('sales')) > df2 = input_df.groupBy("key").agg(F.sum('units').alias('units')) > df1 = df1.filter(F.col("key") != F.lit("c")) > df2 = df2.filter(F.col("key") != F.lit("d")) > ret = df1.join(df2, df1.key == df2.key, "full").select( > df1["key"].alias("df1_key"), > df2["key"].alias("df2_key"), > df1["sales"], > df2["units"], > F.coalesce(df1["key"], df2["key"]).alias("key")) > ret.show() > ret.explain(){code} > output for 2.4.4: > {code:java} > >>> sc.version > u'2.4.4' > >>> import pyspark.sql.functions as F > >>> d = [{'key': 'a', 'sales': 1, 'units' : 2}, {'key': 'a', 'sales': 2, > >>> 'units' : 4}, {'key': 'b', 'sales': 5, 'units' : 10}, {'key': 'c', > >>> 'sales': 1, 'units' : 2}, {'key': 'd', 'sales': 3, 'units' : 6}] > >>> input_df = spark.createDataFrame(d) > >>> df1 = input_df.groupBy("key").agg(F.sum('sales').alias('sales')) > >>> df2 = input_df.groupBy("key").agg(F.sum('units').alias('units')) > >>> df1 = df1.filter(F.col("key") != F.lit("c")) > >>> df2 = df2.filter(F.col("key") != F.lit("d")) > >>> ret = df1.join(df2, df1.key == df2.key, "full").select( > ... df1["key"].alias("df1_key"), > ... df2["key"].alias("df2_key"), > ... df1["sales"], > ... df2["units"], > ... F.coalesce(df1["key"], df2["key"]).alias("key")) > 20/10/05 15:46:14 WARN Column: Constructing trivially true equals predicate, > 'key#213 = key#213'. Perhaps you need to use aliases. > >>> ret.show() > +---+---+-+-++ > |df1_key|df2_key|sales|units| key| > +---+---+-+-++ > | d| d|3| null| d| > | null| null| null|2|null| > | b| b|5| 10| b| > | a| a|3|6| a| > +---+---+-+-++>>> ret.explain() > == Physical Plan == > *(5) Project [key#213 AS df1_key#258, key#213 AS df2_key#259, sales#223L, > units#230L, coalesce(key#213, key#213) AS key#260] > +- SortMergeJoin [key#213], [key#237], FullOuter >:- *(2) Sort [key#213 ASC NULLS FIRST], false, 0 >: +- *(2) HashAggregate(keys=[key#213], functions=[sum(sales#214L)]) >: +- Exchange hashpartitioning(key#213, 200) >:+- *(1) HashAggregate(keys=[key#213], > functions=[partial_sum(sales#214L)]) >: +- *(1) Project [key#213, sales#214L] >: +- *(1) Filter (isnotnull(key#213) && NOT (key#213 = c)) >: +- Scan ExistingRDD[key#213,sales#214L,units#215L] >+- *(4) Sort [key#237 ASC NULLS FIRST], false, 0 > +- *(4) HashAggregate(keys=[key#237], functions=[sum(units#239L)]) > +- Exchange hashpartitioning(key#237, 200) > +- *(3) HashAggregate(keys=[key#237], > functions=[partial_sum(units#239L)]) >+- *(3) Project [key#237, units#239L] > +- *(3) Filter (isnotnull(key#237) && NOT (key#237 = d)) > +- Scan ExistingRDD[key#237,sales#238L,units#239L] > {code} > output for 3.0.1: > {code:java} > // code placeholder > >>> sc.version > u'3.0.1' > >>> import pyspark.sql.functions as F > >>> d = [{'key': 'a', 'sales': 1, 'units' : 2}, {'key': 'a', 'sales': 2, > >>> 'units' : 4}, {'key': 'b', 'sales': 5, 'units' : 10}, {'key': 'c', > >>> 'sales': 1, 'units' : 2}, {'key': 'd', 'sales': 3, 'units' : 6}] > >>> input_df = spark.createDataFrame(d) > /usr/local/lib/python2.7/site-packages/pyspark/sql/session.py:381: > UserWarning: inferring schema from dict is deprecated,please use > pyspark.sql.Row instead > warnings.warn("inferring schema from dict is deprecated," > >>> df1 = input_df.groupBy("key").agg(F.sum('sales').alias('sales')) > >>> df2 = input_df.groupBy("key").agg(F.sum('units').alias('units')) > >>> df1 = df1.filter(F.col("key") != F.lit("c")) > >>>