spark git commit: [SPARK-16818] Exchange reuse incorrectly reuses scans over different sets of partitions

2016-08-01 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 1813bbd9b -> 5fbf5f93e


[SPARK-16818] Exchange reuse incorrectly reuses scans over different sets of 
partitions

https://github.com/apache/spark/pull/14425 rebased for branch-2.0

Author: Eric Liang 

Closes #14427 from ericl/spark-16818-br-2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5fbf5f93
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5fbf5f93
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5fbf5f93

Branch: refs/heads/branch-2.0
Commit: 5fbf5f93ee5aa4d1aca0fa0c8fb769a085dd7b93
Parents: 1813bbd
Author: Eric Liang 
Authored: Mon Aug 1 19:46:20 2016 -0700
Committer: Reynold Xin 
Committed: Mon Aug 1 19:46:20 2016 -0700

--
 .../datasources/FileSourceStrategy.scala|  2 ++
 .../datasources/FileSourceStrategySuite.scala   | 35 +++-
 2 files changed, 36 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5fbf5f93/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 13a86bf..8af9562 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -202,7 +202,9 @@ private[sql] object FileSourceStrategy extends Strategy 
with Logging {
   partitions
   }
 
+  // These metadata values make scan plans uniquely identifiable for 
equality checking.
   val meta = Map(
+"PartitionFilters" -> partitionKeyFilters.mkString("[", ", ", "]"),
 "Format" -> files.fileFormat.toString,
 "ReadSchema" -> prunedDataSchema.simpleString,
 PUSHED_FILTERS -> pushedDownFilters.mkString("[", ", ", "]"),

http://git-wip-us.apache.org/repos/asf/spark/blob/5fbf5f93/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 8d8a18f..7a24f21 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet, 
PredicateHelper}
 import org.apache.spark.sql.catalyst.util
-import org.apache.spark.sql.execution.DataSourceScanExec
+import org.apache.spark.sql.execution.{DataSourceScanExec, SparkPlan}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
@@ -407,6 +407,39 @@ class FileSourceStrategySuite extends QueryTest with 
SharedSQLContext with Predi
 }
   }
 
+  test("[SPARK-16818] partition pruned file scans implement sameResult 
correctly") {
+withTempPath { path =>
+  val tempDir = path.getCanonicalPath
+  spark.range(100)
+.selectExpr("id", "id as b")
+.write
+.partitionBy("id")
+.parquet(tempDir)
+  val df = spark.read.parquet(tempDir)
+  def getPlan(df: DataFrame): SparkPlan = {
+df.queryExecution.executedPlan
+  }
+  assert(getPlan(df.where("id = 2")).sameResult(getPlan(df.where("id = 
2"
+  assert(!getPlan(df.where("id = 2")).sameResult(getPlan(df.where("id = 
3"
+}
+  }
+
+  test("[SPARK-16818] exchange reuse respects differences in partition 
pruning") {
+spark.conf.set("spark.sql.exchange.reuse", true)
+withTempPath { path =>
+  val tempDir = path.getCanonicalPath
+  spark.range(10)
+.selectExpr("id % 2 as a", "id % 3 as b", "id as c")
+.write
+.partitionBy("a")
+.parquet(tempDir)
+  val df = spark.read.parquet(tempDir)
+  val df1 = df.where("a = 0").groupBy("b").agg("c" -> "sum")
+  val df2 = df.where("a = 1").groupBy("b").agg("c" -> "sum")
+  checkAnswer(df1.join(df2, "b"), Row(0, 6, 12) :: Row(1, 4, 8) :: Row(2, 
10, 5) :: Nil)
+}
+  }
+
   // Helpers for checking the arguments passed to the FileFormat.
 
   protected val checkPartitionSchema =



spark git commit: [SPARK-16818] Exchange reuse incorrectly reuses scans over different sets of partitions

2016-07-30 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master a6290e51e -> 957a8ab37


[SPARK-16818] Exchange reuse incorrectly reuses scans over different sets of 
partitions

## What changes were proposed in this pull request?

This fixes a bug wherethe file scan operator does not take into account 
partition pruning in its implementation of `sameResult()`. As a result, 
executions may be incorrect on self-joins over the same base file relation.

The patch here is minimal, but we should reconsider relying on `metadata` for 
implementing sameResult() in the future, as string representations may not be 
uniquely identifying.

cc rxin

## How was this patch tested?

Unit tests.

Author: Eric Liang 

Closes #14425 from ericl/spark-16818.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/957a8ab3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/957a8ab3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/957a8ab3

Branch: refs/heads/master
Commit: 957a8ab3743521850fb1c0106c37c5d3997b9e56
Parents: a6290e5
Author: Eric Liang 
Authored: Sat Jul 30 22:48:09 2016 -0700
Committer: Reynold Xin 
Committed: Sat Jul 30 22:48:09 2016 -0700

--
 .../datasources/FileSourceStrategy.scala|  2 ++
 .../datasources/FileSourceStrategySuite.scala   | 35 +++-
 2 files changed, 36 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/957a8ab3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 32aa471..6749130 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -130,7 +130,9 @@ private[sql] object FileSourceStrategy extends Strategy 
with Logging {
   createNonBucketedReadRDD(readFile, selectedPartitions, fsRelation)
   }
 
+  // These metadata values make scan plans uniquely identifiable for 
equality checking.
   val meta = Map(
+"PartitionFilters" -> partitionKeyFilters.mkString("[", ", ", "]"),
 "Format" -> fsRelation.fileFormat.toString,
 "ReadSchema" -> prunedDataSchema.simpleString,
 PUSHED_FILTERS -> pushedDownFilters.mkString("[", ", ", "]"),

http://git-wip-us.apache.org/repos/asf/spark/blob/957a8ab3/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 2f551b1..1824650 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet, 
PredicateHelper}
 import org.apache.spark.sql.catalyst.util
-import org.apache.spark.sql.execution.DataSourceScanExec
+import org.apache.spark.sql.execution.{DataSourceScanExec, SparkPlan}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
@@ -408,6 +408,39 @@ class FileSourceStrategySuite extends QueryTest with 
SharedSQLContext with Predi
 }
   }
 
+  test("[SPARK-16818] partition pruned file scans implement sameResult 
correctly") {
+withTempPath { path =>
+  val tempDir = path.getCanonicalPath
+  spark.range(100)
+.selectExpr("id", "id as b")
+.write
+.partitionBy("id")
+.parquet(tempDir)
+  val df = spark.read.parquet(tempDir)
+  def getPlan(df: DataFrame): SparkPlan = {
+df.queryExecution.executedPlan
+  }
+  assert(getPlan(df.where("id = 2")).sameResult(getPlan(df.where("id = 
2"
+  assert(!getPlan(df.where("id = 2")).sameResult(getPlan(df.where("id = 
3"
+}
+  }
+
+  test("[SPARK-16818] exchange reuse respects differences in partition 
pruning") {
+spark.conf.set("spark.sql.exchange.reuse", true)
+withTempPath { path =>
+  val tempDir = path.getCanonicalPath
+  spark.range(10)
+