Repository: spark
Updated Branches:
  refs/heads/master fc3cd2f50 -> 62b7f306f


[SPARK-14607] [SPARK-14484] [SQL] fix case-insensitive predicates in 
FileSourceStrategy

## What changes were proposed in this pull request?

When prune the partitions or push down predicates, case-sensitivity is not 
respected. In order to make it work with case-insensitive, this PR update the 
AttributeReference inside predicate to use the name from schema.

## How was this patch tested?

Add regression tests for case-insensitive.

Author: Davies Liu <dav...@databricks.com>

Closes #12371 from davies/case_insensi.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/62b7f306
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/62b7f306
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/62b7f306

Branch: refs/heads/master
Commit: 62b7f306fbf77de7f6cbb36181ebebdb4a55acc5
Parents: fc3cd2f
Author: Davies Liu <dav...@databricks.com>
Authored: Wed Apr 13 17:17:19 2016 -0700
Committer: Davies Liu <davies....@gmail.com>
Committed: Wed Apr 13 17:17:19 2016 -0700

----------------------------------------------------------------------
 .../datasources/FileSourceStrategy.scala        | 14 ++++++++--
 .../apache/spark/sql/sources/interfaces.scala   |  5 +---
 .../datasources/FileSourceStrategySuite.scala   | 28 ++++++++++++++++++++
 3 files changed, 41 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/62b7f306/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index bcddf72..80a9156 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -64,18 +64,28 @@ private[sql] object FileSourceStrategy extends Strategy 
with Logging {
       //  - filters that need to be evaluated again after the scan
       val filterSet = ExpressionSet(filters)
 
+      // The attribute name of predicate could be different than the one in 
schema in case of
+      // case insensitive, we should change them to match the one in schema, 
so we donot need to
+      // worry about case sensitivity anymore.
+      val normalizedFilters = filters.map { e =>
+        e transform {
+          case a: AttributeReference =>
+            a.withName(l.output.find(_.semanticEquals(a)).get.name)
+        }
+      }
+
       val partitionColumns =
         l.resolve(files.partitionSchema, 
files.sqlContext.sessionState.analyzer.resolver)
       val partitionSet = AttributeSet(partitionColumns)
       val partitionKeyFilters =
-        ExpressionSet(filters.filter(_.references.subsetOf(partitionSet)))
+        
ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
       logInfo(s"Pruning directories with: 
${partitionKeyFilters.mkString(",")}")
 
       val dataColumns =
         l.resolve(files.dataSchema, 
files.sqlContext.sessionState.analyzer.resolver)
 
       // Partition keys are not available in the statistics of the files.
-      val dataFilters = 
filters.filter(_.references.intersect(partitionSet).isEmpty)
+      val dataFilters = 
normalizedFilters.filter(_.references.intersect(partitionSet).isEmpty)
 
       // Predicates with both partition keys and attributes need to be 
evaluated after the scan.
       val afterScanFilters = filterSet -- partitionKeyFilters

http://git-wip-us.apache.org/repos/asf/spark/blob/62b7f306/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index bea243a..4b9bf8d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -593,10 +593,7 @@ class HDFSFileCatalog(
     }
 
     if (partitionPruningPredicates.nonEmpty) {
-      val predicate =
-        partitionPruningPredicates
-            .reduceOption(expressions.And)
-            .getOrElse(Literal(true))
+      val predicate = partitionPruningPredicates.reduce(expressions.And)
 
       val boundPredicate = InterpretedPredicate.create(predicate.transform {
         case a: AttributeReference =>

http://git-wip-us.apache.org/repos/asf/spark/blob/62b7f306/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 90d7f53..0b74f07 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -196,6 +196,34 @@ class FileSourceStrategySuite extends QueryTest with 
SharedSQLContext with Predi
     checkDataFilters(Set(IsNotNull("c1"), EqualTo("c1", 1)))
   }
 
+  test("partitioned table - case insensitive") {
+    withSQLConf("spark.sql.caseSensitive" -> "false") {
+      val table =
+        createTable(
+          files = Seq(
+            "p1=1/file1" -> 10,
+            "p1=2/file2" -> 10))
+
+      // Only one file should be read.
+      checkScan(table.where("P1 = 1")) { partitions =>
+        assert(partitions.size == 1, "when checking partitions")
+        assert(partitions.head.files.size == 1, "when files in partition 1")
+      }
+      // We don't need to reevaluate filters that are only on partitions.
+      checkDataFilters(Set.empty)
+
+      // Only one file should be read.
+      checkScan(table.where("P1 = 1 AND C1 = 1 AND (P1 + C1) = 1")) { 
partitions =>
+        assert(partitions.size == 1, "when checking partitions")
+        assert(partitions.head.files.size == 1, "when checking files in 
partition 1")
+        assert(partitions.head.files.head.partitionValues.getInt(0) == 1,
+          "when checking partition values")
+      }
+      // Only the filters that do not contain the partition column should be 
pushed down
+      checkDataFilters(Set(IsNotNull("c1"), EqualTo("c1", 1)))
+    }
+  }
+
   test("partitioned table - after scan filters") {
     val table =
       createTable(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to