[ 
https://issues.apache.org/jira/browse/SPARK-24051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16449513#comment-16449513
 ] 

Emlyn Corrin commented on SPARK-24051:
--------------------------------------

I've reshuffled the pyspark version to make it even clearer:
{code:python}
from pyspark.sql import functions, Window
cols = [functions.col("a"),
        functions.col("b").alias("b"),
        functions.count(functions.lit(1)).over(Window.partitionBy()).alias("n")]
ds1 = spark.createDataFrame([[1,42],[2,99]], ["a","b"]).select(cols)
ds1.show()
# +---+---+---+
# |  a|  b|  n|
# +---+---+---+
# |  1| 42|  2|
# |  2| 99|  2|
# +---+---+---+

ds2 = spark.createDataFrame([[3]], ["a"]).withColumn("b", 
functions.lit(0)).select(cols)
ds2.show()
# +---+---+---+
# |  a|  b|  n|
# +---+---+---+
# |  3|  0|  1|
# +---+---+---+

ds1.union(ds2).show() # look at column b
+---+---+---+
# |  a|  b|  n|
# +---+---+---+
# |  1|  0|  2|
# |  2|  0|  2|
# |  3|  0|  1|
# +---+---+---+

ds1.union(ds2).explain() # the literal 0 as "b" has been pushed into both 
branches of the union
# == Physical Plan ==
# Union
# :- *(2) Project [a#102L, 0 AS b#0, n#2L]
# :  +- Window [count(1) windowspecdefinition(specifiedwindowframe(RowFrame, 
unboundedpreceding$(), unboundedfollowing$())) AS n#2L]
# :     +- Exchange SinglePartition
# :        +- *(1) Project [a#102L]
# :           +- Scan ExistingRDD[a#102L,b#103L]
# +- *(3) Project [a#22L, 0 AS b#126L, n#2L]
#    +- Window [count(1) windowspecdefinition(specifiedwindowframe(RowFrame, 
unboundedpreceding$(), unboundedfollowing$())) AS n#2L]
#       +- Exchange SinglePartition
#          +- Scan ExistingRDD[a#22L]

ds1.union(ds2).drop("n").show() # if we drop "n", column b is correct again:
# +---+---+
# |  a|  b|
# +---+---+
# |  1| 42|
# |  2| 99|
# |  3|  0|
# +---+---+
{code}

> Incorrect results for certain queries using Java and Python APIs on Spark 
> 2.3.0
> -------------------------------------------------------------------------------
>
>                 Key: SPARK-24051
>                 URL: https://issues.apache.org/jira/browse/SPARK-24051
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.0
>            Reporter: Emlyn Corrin
>            Priority: Major
>
> I'm seeing Spark 2.3.0 return incorrect results for a certain (very specific) 
> query, demonstrated by the Java program below. It was simplified from a much 
> more complex query, but I'm having trouble simplifying it further without 
> removing the erroneous behaviour.
> {code:java}
> package sparktest;
> import org.apache.spark.SparkConf;
> import org.apache.spark.sql.*;
> import org.apache.spark.sql.expressions.Window;
> import org.apache.spark.sql.types.DataTypes;
> import org.apache.spark.sql.types.Metadata;
> import org.apache.spark.sql.types.StructField;
> import org.apache.spark.sql.types.StructType;
> import java.util.Arrays;
> public class Main {
>     public static void main(String[] args) {
>         SparkConf conf = new SparkConf()
>                 .setAppName("SparkTest")
>                 .setMaster("local[*]");
>         SparkSession session = 
> SparkSession.builder().config(conf).getOrCreate();
>         Row[] arr1 = new Row[]{
>                 RowFactory.create(1, 42),
>                 RowFactory.create(2, 99)};
>         StructType sch1 = new StructType(new StructField[]{
>                 new StructField("a", DataTypes.IntegerType, true, 
> Metadata.empty()),
>                 new StructField("b", DataTypes.IntegerType, true, 
> Metadata.empty())});
>         Dataset<Row> ds1 = session.createDataFrame(Arrays.asList(arr1), sch1);
>         ds1.show();
>         Row[] arr2 = new Row[]{
>                 RowFactory.create(3)};
>         StructType sch2 = new StructType(new StructField[]{
>                 new StructField("a", DataTypes.IntegerType, true, 
> Metadata.empty())});
>         Dataset<Row> ds2 = session.createDataFrame(Arrays.asList(arr2), sch2)
>                 .withColumn("b", functions.lit(0));
>         ds2.show();
>         Column[] cols = new Column[]{
>                 new Column("a"),
>                 new Column("b").as("b"),
>                 functions.count(functions.lit(1))
>                         .over(Window.partitionBy())
>                         .as("n")};
>         Dataset<Row> ds = ds1
>                 .select(cols)
>                 .union(ds2.select(cols))
>                 .where(new Column("n").geq(1))
>                 .drop("n");
>         ds.show();
>         //ds.explain(true);
>     }
> }
> {code}
> It just calculates the union of 2 datasets,
> {code:java}
> +---+---+
> |  a|  b|
> +---+---+
> |  1| 42|
> |  2| 99|
> +---+---+
> {code}
> with
> {code:java}
> +---+---+
> |  a|  b|
> +---+---+
> |  3|  0|
> +---+---+
> {code}
> The expected result is:
> {code:java}
> +---+---+
> |  a|  b|
> +---+---+
> |  1| 42|
> |  2| 99|
> |  3|  0|
> +---+---+
> {code}
> but instead it prints:
> {code:java}
> +---+---+
> |  a|  b|
> +---+---+
> |  1|  0|
> |  2|  0|
> |  3|  0|
> +---+---+
> {code}
> notice how the value in column c is always zero, overriding the original 
> values in rows 1 and 2.
>  Making seemingly trivial changes, such as replacing {{new 
> Column("b").as("b"),}} with just {{new Column("b"),}} or removing the 
> {{where}} clause after the union, make it behave correctly again.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to