spark git commit: [SPARK-16818] Exchange reuse incorrectly reuses scans over different sets of partitions
Repository: spark Updated Branches: refs/heads/branch-2.0 1813bbd9b -> 5fbf5f93e [SPARK-16818] Exchange reuse incorrectly reuses scans over different sets of partitions https://github.com/apache/spark/pull/14425 rebased for branch-2.0 Author: Eric LiangCloses #14427 from ericl/spark-16818-br-2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5fbf5f93 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5fbf5f93 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5fbf5f93 Branch: refs/heads/branch-2.0 Commit: 5fbf5f93ee5aa4d1aca0fa0c8fb769a085dd7b93 Parents: 1813bbd Author: Eric Liang Authored: Mon Aug 1 19:46:20 2016 -0700 Committer: Reynold Xin Committed: Mon Aug 1 19:46:20 2016 -0700 -- .../datasources/FileSourceStrategy.scala| 2 ++ .../datasources/FileSourceStrategySuite.scala | 35 +++- 2 files changed, 36 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5fbf5f93/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 13a86bf..8af9562 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -202,7 +202,9 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { partitions } + // These metadata values make scan plans uniquely identifiable for equality checking. val meta = Map( +"PartitionFilters" -> partitionKeyFilters.mkString("[", ", ", "]"), "Format" -> files.fileFormat.toString, "ReadSchema" -> prunedDataSchema.simpleString, PUSHED_FILTERS -> pushedDownFilters.mkString("[", ", ", "]"), http://git-wip-us.apache.org/repos/asf/spark/blob/5fbf5f93/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index 8d8a18f..7a24f21 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet, PredicateHelper} import org.apache.spark.sql.catalyst.util -import org.apache.spark.sql.execution.DataSourceScanExec +import org.apache.spark.sql.execution.{DataSourceScanExec, SparkPlan} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ @@ -407,6 +407,39 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi } } + test("[SPARK-16818] partition pruned file scans implement sameResult correctly") { +withTempPath { path => + val tempDir = path.getCanonicalPath + spark.range(100) +.selectExpr("id", "id as b") +.write +.partitionBy("id") +.parquet(tempDir) + val df = spark.read.parquet(tempDir) + def getPlan(df: DataFrame): SparkPlan = { +df.queryExecution.executedPlan + } + assert(getPlan(df.where("id = 2")).sameResult(getPlan(df.where("id = 2" + assert(!getPlan(df.where("id = 2")).sameResult(getPlan(df.where("id = 3" +} + } + + test("[SPARK-16818] exchange reuse respects differences in partition pruning") { +spark.conf.set("spark.sql.exchange.reuse", true) +withTempPath { path => + val tempDir = path.getCanonicalPath + spark.range(10) +.selectExpr("id % 2 as a", "id % 3 as b", "id as c") +.write +.partitionBy("a") +.parquet(tempDir) + val df = spark.read.parquet(tempDir) + val df1 = df.where("a = 0").groupBy("b").agg("c" -> "sum") + val df2 = df.where("a = 1").groupBy("b").agg("c" -> "sum") + checkAnswer(df1.join(df2, "b"), Row(0, 6, 12) :: Row(1, 4, 8) :: Row(2, 10, 5) :: Nil) +} + } + // Helpers for checking the arguments passed to the FileFormat. protected val checkPartitionSchema =
spark git commit: [SPARK-16818] Exchange reuse incorrectly reuses scans over different sets of partitions
Repository: spark Updated Branches: refs/heads/master a6290e51e -> 957a8ab37 [SPARK-16818] Exchange reuse incorrectly reuses scans over different sets of partitions ## What changes were proposed in this pull request? This fixes a bug wherethe file scan operator does not take into account partition pruning in its implementation of `sameResult()`. As a result, executions may be incorrect on self-joins over the same base file relation. The patch here is minimal, but we should reconsider relying on `metadata` for implementing sameResult() in the future, as string representations may not be uniquely identifying. cc rxin ## How was this patch tested? Unit tests. Author: Eric LiangCloses #14425 from ericl/spark-16818. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/957a8ab3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/957a8ab3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/957a8ab3 Branch: refs/heads/master Commit: 957a8ab3743521850fb1c0106c37c5d3997b9e56 Parents: a6290e5 Author: Eric Liang Authored: Sat Jul 30 22:48:09 2016 -0700 Committer: Reynold Xin Committed: Sat Jul 30 22:48:09 2016 -0700 -- .../datasources/FileSourceStrategy.scala| 2 ++ .../datasources/FileSourceStrategySuite.scala | 35 +++- 2 files changed, 36 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/957a8ab3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 32aa471..6749130 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -130,7 +130,9 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { createNonBucketedReadRDD(readFile, selectedPartitions, fsRelation) } + // These metadata values make scan plans uniquely identifiable for equality checking. val meta = Map( +"PartitionFilters" -> partitionKeyFilters.mkString("[", ", ", "]"), "Format" -> fsRelation.fileFormat.toString, "ReadSchema" -> prunedDataSchema.simpleString, PUSHED_FILTERS -> pushedDownFilters.mkString("[", ", ", "]"), http://git-wip-us.apache.org/repos/asf/spark/blob/957a8ab3/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index 2f551b1..1824650 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet, PredicateHelper} import org.apache.spark.sql.catalyst.util -import org.apache.spark.sql.execution.DataSourceScanExec +import org.apache.spark.sql.execution.{DataSourceScanExec, SparkPlan} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ @@ -408,6 +408,39 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi } } + test("[SPARK-16818] partition pruned file scans implement sameResult correctly") { +withTempPath { path => + val tempDir = path.getCanonicalPath + spark.range(100) +.selectExpr("id", "id as b") +.write +.partitionBy("id") +.parquet(tempDir) + val df = spark.read.parquet(tempDir) + def getPlan(df: DataFrame): SparkPlan = { +df.queryExecution.executedPlan + } + assert(getPlan(df.where("id = 2")).sameResult(getPlan(df.where("id = 2" + assert(!getPlan(df.where("id = 2")).sameResult(getPlan(df.where("id = 3" +} + } + + test("[SPARK-16818] exchange reuse respects differences in partition pruning") { +spark.conf.set("spark.sql.exchange.reuse", true) +withTempPath { path => + val tempDir = path.getCanonicalPath + spark.range(10) +