Eric Liang created SPARK-16818:
----------------------------------

             Summary: Exchange reuse incorrectly reuses scans over different 
sets of partitions
                 Key: SPARK-16818
                 URL: https://issues.apache.org/jira/browse/SPARK-16818
             Project: Spark
          Issue Type: Bug
    Affects Versions: 2.0.0
            Reporter: Eric Liang
            Priority: Critical


This happens because the file scan operator does not take into account 
partition pruning in its implementation of `sameResult()`. As a result, 
executions may be incorrect on self-joins over the same base file relation. 
Here's a minimal test case to reproduce:

{code}
    spark.conf.set("spark.sql.exchange.reuse", true)  // defaults to true in 2.0
    withTempPath { path =>
      val tempDir = path.getCanonicalPath
      spark.range(10)
        .selectExpr("id % 2 as a", "id % 3 as b", "id as c")
        .write
        .partitionBy("a")
        .parquet(tempDir)
      val df = spark.read.parquet(tempDir)
      val df1 = df.where("a = 0").groupBy("b").agg("c" -> "sum")
      val df2 = df.where("a = 1").groupBy("b").agg("c" -> "sum")
      checkAnswer(df1.join(df2, "b"), Row(0, 6, 12) :: Row(1, 4, 8) :: Row(2, 
10, 5) :: Nil)
{code}

When exchange reuse is on, the result is
{code}
+---+------+------+
|  b|sum(c)|sum(c)|
+---+------+------+
|  0|     6|     6|
|  1|     4|     4|
|  2|    10|    10|
+---+------+------+
{code}

The correct result is
{code}
+---+------+------+
|  b|sum(c)|sum(c)|
+---+------+------+
|  0|     6|    12|
|  1|     4|     8|
|  2|    10|     5|
+---+------+------+
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to