Repository: spark Updated Branches: refs/heads/branch-2.0 4c694e452 -> d358298f1
[SPARK-17673][SQL] Incorrect exchange reuse with RowDataSourceScan (backport) This backports https://github.com/apache/spark/pull/15273 to branch-2.0 Also verified the test passes after the patch was applied. rxin Author: Eric Liang <e...@databricks.com> Closes #15282 from ericl/spark-17673-2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d358298f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d358298f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d358298f Branch: refs/heads/branch-2.0 Commit: d358298f1082edd31489a1b08f428c8e60278d69 Parents: 4c694e4 Author: Eric Liang <e...@databricks.com> Authored: Wed Sep 28 16:19:06 2016 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Wed Sep 28 16:19:06 2016 -0700 ---------------------------------------------------------------------- .../spark/sql/execution/datasources/DataSourceStrategy.scala | 5 ++++- .../src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 8 ++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d358298f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 6b4b3b8..2779694 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -347,13 +347,16 @@ object DataSourceStrategy extends Strategy with Logging { // `Filter`s or cannot be handled by `relation`. val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And) + // These metadata values make scan plans uniquely identifiable for equality checking. + // TODO(SPARK-17701) using strings for equality checking is brittle val metadata: Map[String, String] = { val pairs = ArrayBuffer.empty[(String, String)] if (pushedFilters.nonEmpty) { pairs += (PUSHED_FILTERS -> pushedFilters.mkString("[", ", ", "]")) } - + pairs += ("ReadSchema" -> + StructType.fromAttributes(projects.map(_.toAttribute)).catalogString) pairs.toMap } http://git-wip-us.apache.org/repos/asf/spark/blob/d358298f/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index ec419e4..1a6dba8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -770,4 +770,12 @@ class JDBCSuite extends SparkFunSuite val schema = JdbcUtils.schemaString(df, "jdbc:mysql://localhost:3306/temp") assert(schema.contains("`order` TEXT")) } + + test("SPARK-17673: Exchange reuse respects differences in output schema") { + val df = sql("SELECT * FROM inttypes WHERE a IS NOT NULL") + val df1 = df.groupBy("a").agg("c" -> "min") + val df2 = df.groupBy("a").agg("d" -> "min") + val res = df1.union(df2) + assert(res.distinct().count() == 2) // would be 1 if the exchange was incorrectly reused + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org