Repository: spark
Updated Branches:
  refs/heads/master 148a84b37 -> 0bf605c2c


[SPARK-19292][SQL] filter with partition columns should be case-insensitive on 
Hive tables

## What changes were proposed in this pull request?

When we query a table with a filter on partitioned columns, we will push the 
partition filter to the metastore to get matched partitions directly.

In `HiveExternalCatalog.listPartitionsByFilter`, we assume the column names in 
partition filter are already normalized and we don't need to consider case 
sensitivity. However, `HiveTableScanExec` doesn't follow this assumption. This 
PR fixes it.

## How was this patch tested?

new regression test

Author: Wenchen Fan <wenc...@databricks.com>

Closes #16647 from cloud-fan/bug.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0bf605c2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0bf605c2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0bf605c2

Branch: refs/heads/master
Commit: 0bf605c2c67ca361cd4aa3a3b4492bef4aef76b9
Parents: 148a84b
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Thu Jan 19 20:09:48 2017 -0800
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Thu Jan 19 20:09:48 2017 -0800

----------------------------------------------------------------------
 .../sql/execution/datasources/FileSourceStrategy.scala |  2 +-
 .../spark/sql/hive/execution/HiveTableScanExec.scala   | 12 +++++++++++-
 .../spark/sql/hive/execution/SQLQuerySuite.scala       | 13 +++++++++++++
 3 files changed, 25 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0bf605c2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 6d0671d..26e1380 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -62,7 +62,7 @@ object FileSourceStrategy extends Strategy with Logging {
       val filterSet = ExpressionSet(filters)
 
       // The attribute name of predicate could be different than the one in 
schema in case of
-      // case insensitive, we should change them to match the one in schema, 
so we donot need to
+      // case insensitive, we should change them to match the one in schema, 
so we do not need to
       // worry about case sensitivity anymore.
       val normalizedFilters = filters.map { e =>
         e transform {

http://git-wip-us.apache.org/repos/asf/spark/blob/0bf605c2/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
index 7ee5fc5..def6ef3 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
@@ -146,9 +146,19 @@ case class HiveTableScanExec(
         hadoopReader.makeRDDForTable(relation.hiveQlTable)
       }
     } else {
+      // The attribute name of predicate could be different than the one in 
schema in case of
+      // case insensitive, we should change them to match the one in schema, 
so we do not need to
+      // worry about case sensitivity anymore.
+      val normalizedFilters = partitionPruningPred.map { e =>
+        e transform {
+          case a: AttributeReference =>
+            a.withName(relation.output.find(_.semanticEquals(a)).get.name)
+        }
+      }
+
       Utils.withDummyCallSite(sqlContext.sparkContext) {
         hadoopReader.makeRDDForPartitionedTable(
-          prunePartitions(relation.getHiveQlPartitions(partitionPruningPred)))
+          prunePartitions(relation.getHiveQlPartitions(normalizedFilters)))
       }
     }
     val numOutputRows = longMetric("numOutputRows")

http://git-wip-us.apache.org/repos/asf/spark/blob/0bf605c2/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 104b525..1a28c4c 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -2014,4 +2014,17 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils 
with TestHiveSingleton {
       )
     }
   }
+
+  test("SPARK-19292: filter with partition columns should be case-insensitive 
on Hive tables") {
+    withTable("tbl") {
+      withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+        sql("CREATE TABLE tbl(i int, j int) USING hive PARTITIONED BY (j)")
+        sql("INSERT INTO tbl PARTITION(j=10) SELECT 1")
+        checkAnswer(spark.table("tbl"), Row(1, 10))
+
+        checkAnswer(sql("SELECT i, j FROM tbl WHERE J=10"), Row(1, 10))
+        checkAnswer(spark.table("tbl").filter($"J" === 10), Row(1, 10))
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to