Eric Liang created SPARK-16818: ---------------------------------- Summary: Exchange reuse incorrectly reuses scans over different sets of partitions Key: SPARK-16818 URL: https://issues.apache.org/jira/browse/SPARK-16818 Project: Spark Issue Type: Bug Affects Versions: 2.0.0 Reporter: Eric Liang Priority: Critical
This happens because the file scan operator does not take into account partition pruning in its implementation of `sameResult()`. As a result, executions may be incorrect on self-joins over the same base file relation. Here's a minimal test case to reproduce: {code} spark.conf.set("spark.sql.exchange.reuse", true) // defaults to true in 2.0 withTempPath { path => val tempDir = path.getCanonicalPath spark.range(10) .selectExpr("id % 2 as a", "id % 3 as b", "id as c") .write .partitionBy("a") .parquet(tempDir) val df = spark.read.parquet(tempDir) val df1 = df.where("a = 0").groupBy("b").agg("c" -> "sum") val df2 = df.where("a = 1").groupBy("b").agg("c" -> "sum") checkAnswer(df1.join(df2, "b"), Row(0, 6, 12) :: Row(1, 4, 8) :: Row(2, 10, 5) :: Nil) {code} When exchange reuse is on, the result is {code} +---+------+------+ | b|sum(c)|sum(c)| +---+------+------+ | 0| 6| 6| | 1| 4| 4| | 2| 10| 10| +---+------+------+ {code} The correct result is {code} +---+------+------+ | b|sum(c)|sum(c)| +---+------+------+ | 0| 6| 12| | 1| 4| 8| | 2| 10| 5| +---+------+------+ {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org