Repository: spark Updated Branches: refs/heads/master fc3cd2f50 -> 62b7f306f
[SPARK-14607] [SPARK-14484] [SQL] fix case-insensitive predicates in FileSourceStrategy ## What changes were proposed in this pull request? When prune the partitions or push down predicates, case-sensitivity is not respected. In order to make it work with case-insensitive, this PR update the AttributeReference inside predicate to use the name from schema. ## How was this patch tested? Add regression tests for case-insensitive. Author: Davies Liu <dav...@databricks.com> Closes #12371 from davies/case_insensi. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/62b7f306 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/62b7f306 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/62b7f306 Branch: refs/heads/master Commit: 62b7f306fbf77de7f6cbb36181ebebdb4a55acc5 Parents: fc3cd2f Author: Davies Liu <dav...@databricks.com> Authored: Wed Apr 13 17:17:19 2016 -0700 Committer: Davies Liu <davies....@gmail.com> Committed: Wed Apr 13 17:17:19 2016 -0700 ---------------------------------------------------------------------- .../datasources/FileSourceStrategy.scala | 14 ++++++++-- .../apache/spark/sql/sources/interfaces.scala | 5 +--- .../datasources/FileSourceStrategySuite.scala | 28 ++++++++++++++++++++ 3 files changed, 41 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/62b7f306/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index bcddf72..80a9156 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -64,18 +64,28 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { // - filters that need to be evaluated again after the scan val filterSet = ExpressionSet(filters) + // The attribute name of predicate could be different than the one in schema in case of + // case insensitive, we should change them to match the one in schema, so we donot need to + // worry about case sensitivity anymore. + val normalizedFilters = filters.map { e => + e transform { + case a: AttributeReference => + a.withName(l.output.find(_.semanticEquals(a)).get.name) + } + } + val partitionColumns = l.resolve(files.partitionSchema, files.sqlContext.sessionState.analyzer.resolver) val partitionSet = AttributeSet(partitionColumns) val partitionKeyFilters = - ExpressionSet(filters.filter(_.references.subsetOf(partitionSet))) + ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet))) logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}") val dataColumns = l.resolve(files.dataSchema, files.sqlContext.sessionState.analyzer.resolver) // Partition keys are not available in the statistics of the files. - val dataFilters = filters.filter(_.references.intersect(partitionSet).isEmpty) + val dataFilters = normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty) // Predicates with both partition keys and attributes need to be evaluated after the scan. val afterScanFilters = filterSet -- partitionKeyFilters http://git-wip-us.apache.org/repos/asf/spark/blob/62b7f306/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index bea243a..4b9bf8d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -593,10 +593,7 @@ class HDFSFileCatalog( } if (partitionPruningPredicates.nonEmpty) { - val predicate = - partitionPruningPredicates - .reduceOption(expressions.And) - .getOrElse(Literal(true)) + val predicate = partitionPruningPredicates.reduce(expressions.And) val boundPredicate = InterpretedPredicate.create(predicate.transform { case a: AttributeReference => http://git-wip-us.apache.org/repos/asf/spark/blob/62b7f306/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index 90d7f53..0b74f07 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -196,6 +196,34 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi checkDataFilters(Set(IsNotNull("c1"), EqualTo("c1", 1))) } + test("partitioned table - case insensitive") { + withSQLConf("spark.sql.caseSensitive" -> "false") { + val table = + createTable( + files = Seq( + "p1=1/file1" -> 10, + "p1=2/file2" -> 10)) + + // Only one file should be read. + checkScan(table.where("P1 = 1")) { partitions => + assert(partitions.size == 1, "when checking partitions") + assert(partitions.head.files.size == 1, "when files in partition 1") + } + // We don't need to reevaluate filters that are only on partitions. + checkDataFilters(Set.empty) + + // Only one file should be read. + checkScan(table.where("P1 = 1 AND C1 = 1 AND (P1 + C1) = 1")) { partitions => + assert(partitions.size == 1, "when checking partitions") + assert(partitions.head.files.size == 1, "when checking files in partition 1") + assert(partitions.head.files.head.partitionValues.getInt(0) == 1, + "when checking partition values") + } + // Only the filters that do not contain the partition column should be pushed down + checkDataFilters(Set(IsNotNull("c1"), EqualTo("c1", 1))) + } + } + test("partitioned table - after scan filters") { val table = createTable( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org