[SPARK-22672][SQL][TEST] Refactor ORC Tests

## What changes were proposed in this pull request?

Since SPARK-20682, we have two `OrcFileFormat`s. This PR refactors ORC tests 
with three principles (with a few exceptions)
1. Move test suite into `sql/core`.
2. Create `HiveXXX` test suite in `sql/hive` by reusing `sql/core` test suite.
3. `OrcTest` will provide common helper functions and `val orcImp: String`.

**Test Suites**

*Native OrcFileFormat*
- org.apache.spark.sql.hive.orc
  - OrcFilterSuite
  - OrcPartitionDiscoverySuite
  - OrcQuerySuite
  - OrcSourceSuite
- o.a.s.sql.hive.orc
  - OrcHadoopFsRelationSuite

*Hive built-in OrcFileFormat*

- o.a.s.sql.hive.orc
  - HiveOrcFilterSuite
  - HiveOrcPartitionDiscoverySuite
  - HiveOrcQuerySuite
  - HiveOrcSourceSuite
  - HiveOrcHadoopFsRelationSuite

**Hierarchy**
```
OrcTest
    -> OrcSuite
        -> OrcSourceSuite
    -> OrcQueryTest
        -> OrcQuerySuite
    -> OrcPartitionDiscoveryTest
        -> OrcPartitionDiscoverySuite
    -> OrcFilterSuite

HadoopFsRelationTest
    -> OrcHadoopFsRelationSuite
        -> HiveOrcHadoopFsRelationSuite
```

Please note the followings.
- Unlike the other test suites, `OrcHadoopFsRelationSuite` doesn't inherit 
`OrcTest`. It is inside `sql/hive` like `ParquetHadoopFsRelationSuite` due to 
the dependencies and follows the existing convention to use `val 
dataSourceName: String`
- `OrcFilterSuite`s cannot reuse test cases due to the different function 
signatures using Hive 1.2.1 ORC classes and Apache ORC 1.4.1 classes.

## How was this patch tested?

Pass the Jenkins tests with reorganized test suites.

Author: Dongjoon Hyun <dongj...@apache.org>

Closes #19882 from dongjoon-hyun/SPARK-22672.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c1e5688d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c1e5688d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c1e5688d

Branch: refs/heads/master
Commit: c1e5688d1a44c152d27dcb9a04da22993d7ab826
Parents: d32337b
Author: Dongjoon Hyun <dongj...@apache.org>
Authored: Thu Dec 7 20:42:46 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Thu Dec 7 20:42:46 2017 +0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/SQLQuerySuite.scala    |  28 -
 .../datasources/orc/OrcFilterSuite.scala        | 367 +++++++++++
 .../orc/OrcPartitionDiscoverySuite.scala        | 229 +++++++
 .../datasources/orc/OrcQuerySuite.scala         | 611 ++++++++++++++++++
 .../datasources/orc/OrcSourceSuite.scala        | 200 ++++++
 .../sql/execution/datasources/orc/OrcTest.scala | 109 ++++
 .../spark/sql/hive/orc/HiveOrcFilterSuite.scala | 387 +++++++++++
 .../orc/HiveOrcPartitionDiscoverySuite.scala    |  25 +
 .../spark/sql/hive/orc/HiveOrcQuerySuite.scala  | 165 +++++
 .../spark/sql/hive/orc/HiveOrcSourceSuite.scala | 107 ++++
 .../spark/sql/hive/orc/OrcFilterSuite.scala     | 347 ----------
 .../sql/hive/orc/OrcHadoopFsRelationSuite.scala |   8 +-
 .../hive/orc/OrcPartitionDiscoverySuite.scala   | 255 --------
 .../spark/sql/hive/orc/OrcQuerySuite.scala      | 641 -------------------
 .../spark/sql/hive/orc/OrcSourceSuite.scala     | 281 --------
 .../org/apache/spark/sql/hive/orc/OrcTest.scala |  78 ---
 16 files changed, 2207 insertions(+), 1631 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c1e5688d/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 8ddddbe..5e07728 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2775,32 +2775,4 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
       }
     }
   }
-
-  test("SPARK-21791 ORC should support column names with dot") {
-    val orc = 
classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat].getCanonicalName
-    withTempDir { dir =>
-      val path = new File(dir, "orc").getCanonicalPath
-      Seq(Some(1), None).toDF("col.dots").write.format(orc).save(path)
-      assert(spark.read.format(orc).load(path).collect().length == 2)
-    }
-  }
-
-  test("SPARK-20728 Make ORCFileFormat configurable between sql/hive and 
sql/core") {
-    withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> "hive") {
-      val e = intercept[AnalysisException] {
-        sql("CREATE TABLE spark_20728(a INT) USING ORC")
-      }
-      assert(e.message.contains("Hive built-in ORC data source must be used 
with Hive support"))
-    }
-
-    withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> "native") {
-      withTable("spark_20728") {
-        sql("CREATE TABLE spark_20728(a INT) USING ORC")
-        val fileFormat = sql("SELECT * FROM 
spark_20728").queryExecution.analyzed.collectFirst {
-          case l: LogicalRelation => 
l.relation.asInstanceOf[HadoopFsRelation].fileFormat.getClass
-        }
-        assert(fileFormat == Some(classOf[OrcFileFormat]))
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c1e5688d/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
new file mode 100644
index 0000000..a5f6b68
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc
+
+import java.nio.charset.StandardCharsets
+import java.sql.{Date, Timestamp}
+
+import scala.collection.JavaConverters._
+
+import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument}
+
+import org.apache.spark.sql.{Column, DataFrame}
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, 
HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types._
+
+/**
+ * A test suite that tests Apache ORC filter API based filter pushdown 
optimization.
+ * OrcFilterSuite and HiveOrcFilterSuite is logically duplicated to provide 
the same test coverage.
+ * The difference are the packages containing 'Predicate' and 'SearchArgument' 
classes.
+ * - OrcFilterSuite uses 'org.apache.orc.storage.ql.io.sarg' package.
+ * - HiveOrcFilterSuite uses 'org.apache.hadoop.hive.ql.io.sarg' package.
+ */
+class OrcFilterSuite extends OrcTest with SharedSQLContext {
+
+  private def checkFilterPredicate(
+      df: DataFrame,
+      predicate: Predicate,
+      checker: (SearchArgument) => Unit): Unit = {
+    val output = predicate.collect { case a: Attribute => a }.distinct
+    val query = df
+      .select(output.map(e => Column(e)): _*)
+      .where(Column(predicate))
+
+    var maybeRelation: Option[HadoopFsRelation] = None
+    val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect {
+      case PhysicalOperation(_, filters, LogicalRelation(orcRelation: 
HadoopFsRelation, _, _, _)) =>
+        maybeRelation = Some(orcRelation)
+        filters
+    }.flatten.reduceLeftOption(_ && _)
+    assert(maybeAnalyzedPredicate.isDefined, "No filter is analyzed from the 
given query")
+
+    val (_, selectedFilters, _) =
+      DataSourceStrategy.selectFilters(maybeRelation.get, 
maybeAnalyzedPredicate.toSeq)
+    assert(selectedFilters.nonEmpty, "No filter is pushed down")
+
+    val maybeFilter = OrcFilters.createFilter(query.schema, selectedFilters)
+    assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for 
$selectedFilters")
+    checker(maybeFilter.get)
+  }
+
+  private def checkFilterPredicate
+      (predicate: Predicate, filterOperator: PredicateLeaf.Operator)
+      (implicit df: DataFrame): Unit = {
+    def checkComparisonOperator(filter: SearchArgument) = {
+      val operator = filter.getLeaves.asScala
+      assert(operator.map(_.getOperator).contains(filterOperator))
+    }
+    checkFilterPredicate(df, predicate, checkComparisonOperator)
+  }
+
+  private def checkFilterPredicate
+      (predicate: Predicate, stringExpr: String)
+      (implicit df: DataFrame): Unit = {
+    def checkLogicalOperator(filter: SearchArgument) = {
+      assert(filter.toString == stringExpr)
+    }
+    checkFilterPredicate(df, predicate, checkLogicalOperator)
+  }
+
+  private def checkNoFilterPredicate
+      (predicate: Predicate)
+      (implicit df: DataFrame): Unit = {
+    val output = predicate.collect { case a: Attribute => a }.distinct
+    val query = df
+      .select(output.map(e => Column(e)): _*)
+      .where(Column(predicate))
+
+    var maybeRelation: Option[HadoopFsRelation] = None
+    val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect {
+      case PhysicalOperation(_, filters, LogicalRelation(orcRelation: 
HadoopFsRelation, _, _, _)) =>
+        maybeRelation = Some(orcRelation)
+        filters
+    }.flatten.reduceLeftOption(_ && _)
+    assert(maybeAnalyzedPredicate.isDefined, "No filter is analyzed from the 
given query")
+
+    val (_, selectedFilters, _) =
+      DataSourceStrategy.selectFilters(maybeRelation.get, 
maybeAnalyzedPredicate.toSeq)
+    assert(selectedFilters.nonEmpty, "No filter is pushed down")
+
+    val maybeFilter = OrcFilters.createFilter(query.schema, selectedFilters)
+    assert(maybeFilter.isEmpty, s"Could generate filter predicate for 
$selectedFilters")
+  }
+
+  test("filter pushdown - integer") {
+    withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { implicit df =>
+      checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
+
+      checkFilterPredicate('_1 === 1, PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate('_1 <=> 1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+      checkFilterPredicate('_1 < 2, PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate('_1 > 3, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 <= 1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 >= 4, PredicateLeaf.Operator.LESS_THAN)
+
+      checkFilterPredicate(Literal(1) === '_1, PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate(Literal(1) <=> '_1, 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+      checkFilterPredicate(Literal(2) > '_1, PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate(Literal(3) < '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal(1) >= '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal(4) <= '_1, PredicateLeaf.Operator.LESS_THAN)
+    }
+  }
+
+  test("filter pushdown - long") {
+    withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i.toLong)))) { implicit 
df =>
+      checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
+
+      checkFilterPredicate('_1 === 1, PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate('_1 <=> 1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+      checkFilterPredicate('_1 < 2, PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate('_1 > 3, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 <= 1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 >= 4, PredicateLeaf.Operator.LESS_THAN)
+
+      checkFilterPredicate(Literal(1) === '_1, PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate(Literal(1) <=> '_1, 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+      checkFilterPredicate(Literal(2) > '_1, PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate(Literal(3) < '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal(1) >= '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal(4) <= '_1, PredicateLeaf.Operator.LESS_THAN)
+    }
+  }
+
+  test("filter pushdown - float") {
+    withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i.toFloat)))) { implicit 
df =>
+      checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
+
+      checkFilterPredicate('_1 === 1, PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate('_1 <=> 1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+      checkFilterPredicate('_1 < 2, PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate('_1 > 3, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 <= 1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 >= 4, PredicateLeaf.Operator.LESS_THAN)
+
+      checkFilterPredicate(Literal(1) === '_1, PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate(Literal(1) <=> '_1, 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+      checkFilterPredicate(Literal(2) > '_1, PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate(Literal(3) < '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal(1) >= '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal(4) <= '_1, PredicateLeaf.Operator.LESS_THAN)
+    }
+  }
+
+  test("filter pushdown - double") {
+    withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i.toDouble)))) { implicit 
df =>
+      checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
+
+      checkFilterPredicate('_1 === 1, PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate('_1 <=> 1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+      checkFilterPredicate('_1 < 2, PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate('_1 > 3, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 <= 1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 >= 4, PredicateLeaf.Operator.LESS_THAN)
+
+      checkFilterPredicate(Literal(1) === '_1, PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate(Literal(1) <=> '_1, 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+      checkFilterPredicate(Literal(2) > '_1, PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate(Literal(3) < '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal(1) >= '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal(4) <= '_1, PredicateLeaf.Operator.LESS_THAN)
+    }
+  }
+
+  test("filter pushdown - string") {
+    withOrcDataFrame((1 to 4).map(i => Tuple1(i.toString))) { implicit df =>
+      checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
+
+      checkFilterPredicate('_1 === "1", PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate('_1 <=> "1", 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+      checkFilterPredicate('_1 < "2", PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate('_1 > "3", PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 <= "1", PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 >= "4", PredicateLeaf.Operator.LESS_THAN)
+
+      checkFilterPredicate(Literal("1") === '_1, PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate(Literal("1") <=> '_1, 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+      checkFilterPredicate(Literal("2") > '_1, 
PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate(Literal("3") < '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal("1") >= '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal("4") <= '_1, 
PredicateLeaf.Operator.LESS_THAN)
+    }
+  }
+
+  test("filter pushdown - boolean") {
+    withOrcDataFrame((true :: false :: Nil).map(b => Tuple1.apply(Option(b)))) 
{ implicit df =>
+      checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
+
+      checkFilterPredicate('_1 === true, PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate('_1 <=> true, 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+      checkFilterPredicate('_1 < true, PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate('_1 > false, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 <= false, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 >= false, PredicateLeaf.Operator.LESS_THAN)
+
+      checkFilterPredicate(Literal(false) === '_1, 
PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate(Literal(false) <=> '_1, 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+      checkFilterPredicate(Literal(false) > '_1, 
PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate(Literal(true) < '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal(true) >= '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal(true) <= '_1, 
PredicateLeaf.Operator.LESS_THAN)
+    }
+  }
+
+  test("filter pushdown - decimal") {
+    withOrcDataFrame((1 to 4).map(i => Tuple1.apply(BigDecimal.valueOf(i)))) { 
implicit df =>
+      checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
+
+      checkFilterPredicate('_1 === BigDecimal.valueOf(1), 
PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate('_1 <=> BigDecimal.valueOf(1), 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+      checkFilterPredicate('_1 < BigDecimal.valueOf(2), 
PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate('_1 > BigDecimal.valueOf(3), 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 <= BigDecimal.valueOf(1), 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 >= BigDecimal.valueOf(4), 
PredicateLeaf.Operator.LESS_THAN)
+
+      checkFilterPredicate(
+        Literal(BigDecimal.valueOf(1)) === '_1, PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate(
+        Literal(BigDecimal.valueOf(1)) <=> '_1, 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+      checkFilterPredicate(
+        Literal(BigDecimal.valueOf(2)) > '_1, PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate(
+        Literal(BigDecimal.valueOf(3)) < '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(
+        Literal(BigDecimal.valueOf(1)) >= '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(
+        Literal(BigDecimal.valueOf(4)) <= '_1, 
PredicateLeaf.Operator.LESS_THAN)
+    }
+  }
+
+  test("filter pushdown - timestamp") {
+    val timeString = "2015-08-20 14:57:00"
+    val timestamps = (1 to 4).map { i =>
+      val milliseconds = Timestamp.valueOf(timeString).getTime + i * 3600
+      new Timestamp(milliseconds)
+    }
+    withOrcDataFrame(timestamps.map(Tuple1(_))) { implicit df =>
+      checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
+
+      checkFilterPredicate('_1 === timestamps(0), 
PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate('_1 <=> timestamps(0), 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+      checkFilterPredicate('_1 < timestamps(1), 
PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate('_1 > timestamps(2), 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 <= timestamps(0), 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 >= timestamps(3), 
PredicateLeaf.Operator.LESS_THAN)
+
+      checkFilterPredicate(Literal(timestamps(0)) === '_1, 
PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate(Literal(timestamps(0)) <=> '_1, 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+      checkFilterPredicate(Literal(timestamps(1)) > '_1, 
PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate(Literal(timestamps(2)) < '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal(timestamps(0)) >= '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal(timestamps(3)) <= '_1, 
PredicateLeaf.Operator.LESS_THAN)
+    }
+  }
+
+  test("filter pushdown - combinations with logical operators") {
+    withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { implicit df =>
+      checkFilterPredicate(
+        '_1.isNotNull,
+        "leaf-0 = (IS_NULL _1), expr = (not leaf-0)"
+      )
+      checkFilterPredicate(
+        '_1 =!= 1,
+        "leaf-0 = (IS_NULL _1), leaf-1 = (EQUALS _1 1), expr = (and (not 
leaf-0) (not leaf-1))"
+      )
+      checkFilterPredicate(
+        !('_1 < 4),
+        "leaf-0 = (IS_NULL _1), leaf-1 = (LESS_THAN _1 4), expr = (and (not 
leaf-0) (not leaf-1))"
+      )
+      checkFilterPredicate(
+        '_1 < 2 || '_1 > 3,
+        "leaf-0 = (LESS_THAN _1 2), leaf-1 = (LESS_THAN_EQUALS _1 3), " +
+          "expr = (or leaf-0 (not leaf-1))"
+      )
+      checkFilterPredicate(
+        '_1 < 2 && '_1 > 3,
+        "leaf-0 = (IS_NULL _1), leaf-1 = (LESS_THAN _1 2), leaf-2 = 
(LESS_THAN_EQUALS _1 3), " +
+          "expr = (and (not leaf-0) leaf-1 (not leaf-2))"
+      )
+    }
+  }
+
+  test("no filter pushdown - non-supported types") {
+    implicit class IntToBinary(int: Int) {
+      def b: Array[Byte] = int.toString.getBytes(StandardCharsets.UTF_8)
+    }
+    // ArrayType
+    withOrcDataFrame((1 to 4).map(i => Tuple1(Array(i)))) { implicit df =>
+      checkNoFilterPredicate('_1.isNull)
+    }
+    // BinaryType
+    withOrcDataFrame((1 to 4).map(i => Tuple1(i.b))) { implicit df =>
+      checkNoFilterPredicate('_1 <=> 1.b)
+    }
+    // DateType
+    val stringDate = "2015-01-01"
+    withOrcDataFrame(Seq(Tuple1(Date.valueOf(stringDate)))) { implicit df =>
+      checkNoFilterPredicate('_1 === Date.valueOf(stringDate))
+    }
+    // MapType
+    withOrcDataFrame((1 to 4).map(i => Tuple1(Map(i -> i)))) { implicit df =>
+      checkNoFilterPredicate('_1.isNotNull)
+    }
+  }
+
+  test("SPARK-12218 Converting conjunctions into ORC SearchArguments") {
+    import org.apache.spark.sql.sources._
+    // The `LessThan` should be converted while the `StringContains` shouldn't
+    val schema = new StructType(
+      Array(
+        StructField("a", IntegerType, nullable = true),
+        StructField("b", StringType, nullable = true)))
+    assertResult("leaf-0 = (LESS_THAN a 10), expr = leaf-0") {
+      OrcFilters.createFilter(schema, Array(
+        LessThan("a", 10),
+        StringContains("b", "prefix")
+      )).get.toString
+    }
+
+    // The `LessThan` should be converted while the whole inner `And` shouldn't
+    assertResult("leaf-0 = (LESS_THAN a 10), expr = leaf-0") {
+      OrcFilters.createFilter(schema, Array(
+        LessThan("a", 10),
+        Not(And(
+          GreaterThan("a", 1),
+          StringContains("b", "prefix")
+        ))
+      )).get.toString
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c1e5688d/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala
new file mode 100644
index 0000000..d1911ea
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc
+
+import java.io.File
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.test.SharedSQLContext
+
+// The data where the partitioning key exists only in the directory structure.
+case class OrcParData(intField: Int, stringField: String)
+
+// The data that also includes the partitioning key
+case class OrcParDataWithKey(intField: Int, pi: Int, stringField: String, ps: 
String)
+
+abstract class OrcPartitionDiscoveryTest extends OrcTest {
+  val defaultPartitionName = "__HIVE_DEFAULT_PARTITION__"
+
+  protected def withTempTable(tableName: String)(f: => Unit): Unit = {
+    try f finally spark.catalog.dropTempView(tableName)
+  }
+
+  protected def makePartitionDir(
+      basePath: File,
+      defaultPartitionName: String,
+      partitionCols: (String, Any)*): File = {
+    val partNames = partitionCols.map { case (k, v) =>
+      val valueString = if (v == null || v == "") defaultPartitionName else 
v.toString
+      s"$k=$valueString"
+    }
+
+    val partDir = partNames.foldLeft(basePath) { (parent, child) =>
+      new File(parent, child)
+    }
+
+    assert(partDir.mkdirs(), s"Couldn't create directory $partDir")
+    partDir
+  }
+
+  test("read partitioned table - normal case") {
+    withTempDir { base =>
+      for {
+        pi <- Seq(1, 2)
+        ps <- Seq("foo", "bar")
+      } {
+        makeOrcFile(
+          (1 to 10).map(i => OrcParData(i, i.toString)),
+          makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
+      }
+
+      spark.read.orc(base.getCanonicalPath).createOrReplaceTempView("t")
+
+      withTempTable("t") {
+        checkAnswer(
+          sql("SELECT * FROM t"),
+          for {
+            i <- 1 to 10
+            pi <- Seq(1, 2)
+            ps <- Seq("foo", "bar")
+          } yield Row(i, i.toString, pi, ps))
+
+        checkAnswer(
+          sql("SELECT intField, pi FROM t"),
+          for {
+            i <- 1 to 10
+            pi <- Seq(1, 2)
+            _ <- Seq("foo", "bar")
+          } yield Row(i, pi))
+
+        checkAnswer(
+          sql("SELECT * FROM t WHERE pi = 1"),
+          for {
+            i <- 1 to 10
+            ps <- Seq("foo", "bar")
+          } yield Row(i, i.toString, 1, ps))
+
+        checkAnswer(
+          sql("SELECT * FROM t WHERE ps = 'foo'"),
+          for {
+            i <- 1 to 10
+            pi <- Seq(1, 2)
+          } yield Row(i, i.toString, pi, "foo"))
+      }
+    }
+  }
+
+  test("read partitioned table - partition key included in orc file") {
+    withTempDir { base =>
+      for {
+        pi <- Seq(1, 2)
+        ps <- Seq("foo", "bar")
+      } {
+        makeOrcFile(
+          (1 to 10).map(i => OrcParDataWithKey(i, pi, i.toString, ps)),
+          makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
+      }
+
+      spark.read.orc(base.getCanonicalPath).createOrReplaceTempView("t")
+
+      withTempTable("t") {
+        checkAnswer(
+          sql("SELECT * FROM t"),
+          for {
+            i <- 1 to 10
+            pi <- Seq(1, 2)
+            ps <- Seq("foo", "bar")
+          } yield Row(i, pi, i.toString, ps))
+
+        checkAnswer(
+          sql("SELECT intField, pi FROM t"),
+          for {
+            i <- 1 to 10
+            pi <- Seq(1, 2)
+            _ <- Seq("foo", "bar")
+          } yield Row(i, pi))
+
+        checkAnswer(
+          sql("SELECT * FROM t WHERE pi = 1"),
+          for {
+            i <- 1 to 10
+            ps <- Seq("foo", "bar")
+          } yield Row(i, 1, i.toString, ps))
+
+        checkAnswer(
+          sql("SELECT * FROM t WHERE ps = 'foo'"),
+          for {
+            i <- 1 to 10
+            pi <- Seq(1, 2)
+          } yield Row(i, pi, i.toString, "foo"))
+      }
+    }
+  }
+
+
+  test("read partitioned table - with nulls") {
+    withTempDir { base =>
+      for {
+      // Must be `Integer` rather than `Int` here. `null.asInstanceOf[Int]` 
results in a zero...
+        pi <- Seq(1, null.asInstanceOf[Integer])
+        ps <- Seq("foo", null.asInstanceOf[String])
+      } {
+        makeOrcFile(
+          (1 to 10).map(i => OrcParData(i, i.toString)),
+          makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
+      }
+
+      spark.read
+        .option("hive.exec.default.partition.name", defaultPartitionName)
+        .orc(base.getCanonicalPath)
+        .createOrReplaceTempView("t")
+
+      withTempTable("t") {
+        checkAnswer(
+          sql("SELECT * FROM t"),
+          for {
+            i <- 1 to 10
+            pi <- Seq(1, null.asInstanceOf[Integer])
+            ps <- Seq("foo", null.asInstanceOf[String])
+          } yield Row(i, i.toString, pi, ps))
+
+        checkAnswer(
+          sql("SELECT * FROM t WHERE pi IS NULL"),
+          for {
+            i <- 1 to 10
+            ps <- Seq("foo", null.asInstanceOf[String])
+          } yield Row(i, i.toString, null, ps))
+
+        checkAnswer(
+          sql("SELECT * FROM t WHERE ps IS NULL"),
+          for {
+            i <- 1 to 10
+            pi <- Seq(1, null.asInstanceOf[Integer])
+          } yield Row(i, i.toString, pi, null))
+      }
+    }
+  }
+
+  test("read partitioned table - with nulls and partition keys are included in 
Orc file") {
+    withTempDir { base =>
+      for {
+        pi <- Seq(1, 2)
+        ps <- Seq("foo", null.asInstanceOf[String])
+      } {
+        makeOrcFile(
+          (1 to 10).map(i => OrcParDataWithKey(i, pi, i.toString, ps)),
+          makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
+      }
+
+      spark.read
+        .option("hive.exec.default.partition.name", defaultPartitionName)
+        .orc(base.getCanonicalPath)
+        .createOrReplaceTempView("t")
+
+      withTempTable("t") {
+        checkAnswer(
+          sql("SELECT * FROM t"),
+          for {
+            i <- 1 to 10
+            pi <- Seq(1, 2)
+            ps <- Seq("foo", null.asInstanceOf[String])
+          } yield Row(i, pi, i.toString, ps))
+
+        checkAnswer(
+          sql("SELECT * FROM t WHERE ps IS NULL"),
+          for {
+            i <- 1 to 10
+            pi <- Seq(1, 2)
+          } yield Row(i, pi, i.toString, null))
+      }
+    }
+  }
+}
+
+class OrcPartitionDiscoverySuite extends OrcPartitionDiscoveryTest with 
SharedSQLContext

http://git-wip-us.apache.org/repos/asf/spark/blob/c1e5688d/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
new file mode 100644
index 0000000..e00e057
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
@@ -0,0 +1,611 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+import java.sql.Timestamp
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.orc.{OrcConf, OrcFile}
+import org.apache.orc.OrcConf.COMPRESS
+import org.apache.orc.mapred.OrcStruct
+import org.apache.orc.mapreduce.OrcInputFormat
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation, RecordReaderIterator}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.util.Utils
+
+case class AllDataTypesWithNonPrimitiveType(
+    stringField: String,
+    intField: Int,
+    longField: Long,
+    floatField: Float,
+    doubleField: Double,
+    shortField: Short,
+    byteField: Byte,
+    booleanField: Boolean,
+    array: Seq[Int],
+    arrayContainsNull: Seq[Option[Int]],
+    map: Map[Int, Long],
+    mapValueContainsNull: Map[Int, Option[Long]],
+    data: (Seq[Int], (Int, String)))
+
+case class BinaryData(binaryData: Array[Byte])
+
+case class Contact(name: String, phone: String)
+
+case class Person(name: String, age: Int, contacts: Seq[Contact])
+
+abstract class OrcQueryTest extends OrcTest {
+  import testImplicits._
+
+  test("Read/write All Types") {
+    val data = (0 to 255).map { i =>
+      (s"$i", i, i.toLong, i.toFloat, i.toDouble, i.toShort, i.toByte, i % 2 
== 0)
+    }
+
+    withOrcFile(data) { file =>
+      checkAnswer(
+        spark.read.orc(file),
+        data.toDF().collect())
+    }
+  }
+
+  test("Read/write binary data") {
+    withOrcFile(BinaryData("test".getBytes(StandardCharsets.UTF_8)) :: Nil) { 
file =>
+      val bytes = spark.read.orc(file).head().getAs[Array[Byte]](0)
+      assert(new String(bytes, StandardCharsets.UTF_8) === "test")
+    }
+  }
+
+  test("Read/write all types with non-primitive type") {
+    val data: Seq[AllDataTypesWithNonPrimitiveType] = (0 to 255).map { i =>
+      AllDataTypesWithNonPrimitiveType(
+        s"$i", i, i.toLong, i.toFloat, i.toDouble, i.toShort, i.toByte, i % 2 
== 0,
+        0 until i,
+        (0 until i).map(Option(_).filter(_ % 3 == 0)),
+        (0 until i).map(i => i -> i.toLong).toMap,
+        (0 until i).map(i => i -> Option(i.toLong)).toMap + (i -> None),
+        (0 until i, (i, s"$i")))
+    }
+
+    withOrcFile(data) { file =>
+      checkAnswer(
+        spark.read.orc(file),
+        data.toDF().collect())
+    }
+  }
+
+  test("Read/write UserDefinedType") {
+    withTempPath { path =>
+      val data = Seq((1, new UDT.MyDenseVector(Array(0.25, 2.25, 4.25))))
+      val udtDF = data.toDF("id", "vectors")
+      udtDF.write.orc(path.getAbsolutePath)
+      val readBack = spark.read.schema(udtDF.schema).orc(path.getAbsolutePath)
+      checkAnswer(udtDF, readBack)
+    }
+  }
+
+  test("Creating case class RDD table") {
+    val data = (1 to 100).map(i => (i, s"val_$i"))
+    sparkContext.parallelize(data).toDF().createOrReplaceTempView("t")
+    withTempView("t") {
+      checkAnswer(sql("SELECT * FROM t"), data.toDF().collect())
+    }
+  }
+
+  test("Simple selection form ORC table") {
+    val data = (1 to 10).map { i =>
+      Person(s"name_$i", i, (0 to 1).map { m => Contact(s"contact_$m", 
s"phone_$m") })
+    }
+
+    withOrcTable(data, "t") {
+      // ppd:
+      // leaf-0 = (LESS_THAN_EQUALS age 5)
+      // expr = leaf-0
+      assert(sql("SELECT name FROM t WHERE age <= 5").count() === 5)
+
+      // ppd:
+      // leaf-0 = (LESS_THAN_EQUALS age 5)
+      // expr = (not leaf-0)
+      assertResult(10) {
+        sql("SELECT name, contacts FROM t where age > 5")
+          .rdd
+          .flatMap(_.getAs[Seq[_]]("contacts"))
+          .count()
+      }
+
+      // ppd:
+      // leaf-0 = (LESS_THAN_EQUALS age 5)
+      // leaf-1 = (LESS_THAN age 8)
+      // expr = (and (not leaf-0) leaf-1)
+      {
+        val df = sql("SELECT name, contacts FROM t WHERE age > 5 AND age < 8")
+        assert(df.count() === 2)
+        assertResult(4) {
+          df.rdd.flatMap(_.getAs[Seq[_]]("contacts")).count()
+        }
+      }
+
+      // ppd:
+      // leaf-0 = (LESS_THAN age 2)
+      // leaf-1 = (LESS_THAN_EQUALS age 8)
+      // expr = (or leaf-0 (not leaf-1))
+      {
+        val df = sql("SELECT name, contacts FROM t WHERE age < 2 OR age > 8")
+        assert(df.count() === 3)
+        assertResult(6) {
+          df.rdd.flatMap(_.getAs[Seq[_]]("contacts")).count()
+        }
+      }
+    }
+  }
+
+  test("save and load case class RDD with `None`s as orc") {
+    val data = (
+      Option.empty[Int],
+      Option.empty[Long],
+      Option.empty[Float],
+      Option.empty[Double],
+      Option.empty[Boolean]
+    ) :: Nil
+
+    withOrcFile(data) { file =>
+      checkAnswer(
+        spark.read.orc(file),
+        Row(Seq.fill(5)(null): _*))
+    }
+  }
+
+  test("SPARK-16610: Respect orc.compress (i.e., OrcConf.COMPRESS) when 
compression is unset") {
+    // Respect `orc.compress` (i.e., OrcConf.COMPRESS).
+    withTempPath { file =>
+      spark.range(0, 10).write
+        .option(COMPRESS.getAttribute, "ZLIB")
+        .orc(file.getCanonicalPath)
+
+      val maybeOrcFile = file.listFiles().find(_.getName.endsWith(".zlib.orc"))
+      assert(maybeOrcFile.isDefined)
+
+      val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath)
+      val conf = OrcFile.readerOptions(new Configuration())
+      assert("ZLIB" === OrcFile.createReader(orcFilePath, 
conf).getCompressionKind.name)
+    }
+
+    // `compression` overrides `orc.compress`.
+    withTempPath { file =>
+      spark.range(0, 10).write
+        .option("compression", "ZLIB")
+        .option(COMPRESS.getAttribute, "SNAPPY")
+        .orc(file.getCanonicalPath)
+
+      val maybeOrcFile = file.listFiles().find(_.getName.endsWith(".zlib.orc"))
+      assert(maybeOrcFile.isDefined)
+
+      val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath)
+      val conf = OrcFile.readerOptions(new Configuration())
+      assert("ZLIB" === OrcFile.createReader(orcFilePath, 
conf).getCompressionKind.name)
+    }
+  }
+
+  // Hive supports zlib, snappy and none for Hive 1.2.1.
+  test("Compression options for writing to an ORC file (SNAPPY, ZLIB and 
NONE)") {
+    withTempPath { file =>
+      spark.range(0, 10).write
+        .option("compression", "ZLIB")
+        .orc(file.getCanonicalPath)
+
+      val maybeOrcFile = file.listFiles().find(_.getName.endsWith(".zlib.orc"))
+      assert(maybeOrcFile.isDefined)
+
+      val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath)
+      val conf = OrcFile.readerOptions(new Configuration())
+      assert("ZLIB" === OrcFile.createReader(orcFilePath, 
conf).getCompressionKind.name)
+    }
+
+    withTempPath { file =>
+      spark.range(0, 10).write
+        .option("compression", "SNAPPY")
+        .orc(file.getCanonicalPath)
+
+      val maybeOrcFile = 
file.listFiles().find(_.getName.endsWith(".snappy.orc"))
+      assert(maybeOrcFile.isDefined)
+
+      val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath)
+      val conf = OrcFile.readerOptions(new Configuration())
+      assert("SNAPPY" === OrcFile.createReader(orcFilePath, 
conf).getCompressionKind.name)
+    }
+
+    withTempPath { file =>
+      spark.range(0, 10).write
+        .option("compression", "NONE")
+        .orc(file.getCanonicalPath)
+
+      val maybeOrcFile = file.listFiles().find(_.getName.endsWith(".orc"))
+      assert(maybeOrcFile.isDefined)
+
+      val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath)
+      val conf = OrcFile.readerOptions(new Configuration())
+      assert("NONE" === OrcFile.createReader(orcFilePath, 
conf).getCompressionKind.name)
+    }
+  }
+
+  test("simple select queries") {
+    withOrcTable((0 until 10).map(i => (i, i.toString)), "t") {
+      checkAnswer(
+        sql("SELECT `_1` FROM t where t.`_1` > 5"),
+        (6 until 10).map(Row.apply(_)))
+
+      checkAnswer(
+        sql("SELECT `_1` FROM t as tmp where tmp.`_1` < 5"),
+        (0 until 5).map(Row.apply(_)))
+    }
+  }
+
+  test("appending") {
+    val data = (0 until 10).map(i => (i, i.toString))
+    spark.createDataFrame(data).toDF("c1", "c2").createOrReplaceTempView("tmp")
+    withOrcTable(data, "t") {
+      sql("INSERT INTO TABLE t SELECT * FROM tmp")
+      checkAnswer(spark.table("t"), (data ++ data).map(Row.fromTuple))
+    }
+    spark.sessionState.catalog.dropTable(
+      TableIdentifier("tmp"),
+      ignoreIfNotExists = true,
+      purge = false)
+  }
+
+  test("overwriting") {
+    val data = (0 until 10).map(i => (i, i.toString))
+    spark.createDataFrame(data).toDF("c1", "c2").createOrReplaceTempView("tmp")
+    withOrcTable(data, "t") {
+      sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp")
+      checkAnswer(spark.table("t"), data.map(Row.fromTuple))
+    }
+    spark.sessionState.catalog.dropTable(
+      TableIdentifier("tmp"),
+      ignoreIfNotExists = true,
+      purge = false)
+  }
+
+  test("self-join") {
+    // 4 rows, cells of column 1 of row 2 and row 4 are null
+    val data = (1 to 4).map { i =>
+      val maybeInt = if (i % 2 == 0) None else Some(i)
+      (maybeInt, i.toString)
+    }
+
+    withOrcTable(data, "t") {
+      val selfJoin = sql("SELECT * FROM t x JOIN t y WHERE x.`_1` = y.`_1`")
+      val queryOutput = selfJoin.queryExecution.analyzed.output
+
+      assertResult(4, "Field count mismatches")(queryOutput.size)
+      assertResult(2, s"Duplicated expression ID in query plan:\n $selfJoin") {
+        queryOutput.filter(_.name == "_1").map(_.exprId).size
+      }
+
+      checkAnswer(selfJoin, List(Row(1, "1", 1, "1"), Row(3, "3", 3, "3")))
+    }
+  }
+
+  test("nested data - struct with array field") {
+    val data = (1 to 10).map(i => Tuple1((i, Seq(s"val_$i"))))
+    withOrcTable(data, "t") {
+      checkAnswer(sql("SELECT `_1`.`_2`[0] FROM t"), data.map {
+        case Tuple1((_, Seq(string))) => Row(string)
+      })
+    }
+  }
+
+  test("nested data - array of struct") {
+    val data = (1 to 10).map(i => Tuple1(Seq(i -> s"val_$i")))
+    withOrcTable(data, "t") {
+      checkAnswer(sql("SELECT `_1`[0].`_2` FROM t"), data.map {
+        case Tuple1(Seq((_, string))) => Row(string)
+      })
+    }
+  }
+
+  test("columns only referenced by pushed down filters should remain") {
+    withOrcTable((1 to 10).map(Tuple1.apply), "t") {
+      checkAnswer(sql("SELECT `_1` FROM t WHERE `_1` < 10"), (1 to 
9).map(Row.apply(_)))
+    }
+  }
+
+  test("SPARK-5309 strings stored using dictionary compression in orc") {
+    withOrcTable((0 until 1000).map(i => ("same", "run_" + i / 100, 1)), "t") {
+      checkAnswer(
+        sql("SELECT `_1`, `_2`, SUM(`_3`) FROM t GROUP BY `_1`, `_2`"),
+        (0 until 10).map(i => Row("same", "run_" + i, 100)))
+
+      checkAnswer(
+        sql("SELECT `_1`, `_2`, SUM(`_3`) FROM t WHERE `_2` = 'run_5' GROUP BY 
`_1`, `_2`"),
+        List(Row("same", "run_5", 100)))
+    }
+  }
+
+  test("SPARK-9170: Don't implicitly lowercase of user-provided columns") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+
+      spark.range(0, 10).select('id as "Acol").write.orc(path)
+      spark.read.orc(path).schema("Acol")
+      intercept[IllegalArgumentException] {
+        spark.read.orc(path).schema("acol")
+      }
+      checkAnswer(spark.read.orc(path).select("acol").sort("acol"),
+        (0 until 10).map(Row(_)))
+    }
+  }
+
+  test("SPARK-10623 Enable ORC PPD") {
+    withTempPath { dir =>
+      withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+        import testImplicits._
+        val path = dir.getCanonicalPath
+
+        // For field "a", the first column has odds integers. This is to check 
the filtered count
+        // when `isNull` is performed. For Field "b", `isNotNull` of ORC file 
filters rows
+        // only when all the values are null (maybe this works differently 
when the data
+        // or query is complicated). So, simply here a column only having 
`null` is added.
+        val data = (0 until 10).map { i =>
+          val maybeInt = if (i % 2 == 0) None else Some(i)
+          val nullValue: Option[String] = None
+          (maybeInt, nullValue)
+        }
+        // It needs to repartition data so that we can have several ORC files
+        // in order to skip stripes in ORC.
+        spark.createDataFrame(data).toDF("a", 
"b").repartition(10).write.orc(path)
+        val df = spark.read.orc(path)
+
+        def checkPredicate(pred: Column, answer: Seq[Row]): Unit = {
+          val sourceDf = stripSparkFilter(df.where(pred))
+          val data = sourceDf.collect().toSet
+          val expectedData = answer.toSet
+
+          // When a filter is pushed to ORC, ORC can apply it to rows. So, we 
can check
+          // the number of rows returned from the ORC to make sure our filter 
pushdown work.
+          // A tricky part is, ORC does not process filter rows fully but 
return some possible
+          // results. So, this checks if the number of result is less than the 
original count
+          // of data, and then checks if it contains the expected data.
+          assert(
+            sourceDf.count < 10 && expectedData.subsetOf(data),
+            s"No data was filtered for predicate: $pred")
+        }
+
+        checkPredicate('a === 5, List(5).map(Row(_, null)))
+        checkPredicate('a <=> 5, List(5).map(Row(_, null)))
+        checkPredicate('a < 5, List(1, 3).map(Row(_, null)))
+        checkPredicate('a <= 5, List(1, 3, 5).map(Row(_, null)))
+        checkPredicate('a > 5, List(7, 9).map(Row(_, null)))
+        checkPredicate('a >= 5, List(5, 7, 9).map(Row(_, null)))
+        checkPredicate('a.isNull, List(null).map(Row(_, null)))
+        checkPredicate('b.isNotNull, List())
+        checkPredicate('a.isin(3, 5, 7), List(3, 5, 7).map(Row(_, null)))
+        checkPredicate('a > 0 && 'a < 3, List(1).map(Row(_, null)))
+        checkPredicate('a < 1 || 'a > 8, List(9).map(Row(_, null)))
+        checkPredicate(!('a > 3), List(1, 3).map(Row(_, null)))
+        checkPredicate(!('a > 0 && 'a < 3), List(3, 5, 7, 9).map(Row(_, null)))
+      }
+    }
+  }
+
+  test("SPARK-14962 Produce correct results on array type with isnotnull") {
+    withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+      val data = (0 until 10).map(i => Tuple1(Array(i)))
+      withOrcFile(data) { file =>
+        val actual = spark
+          .read
+          .orc(file)
+          .where("_1 is not null")
+        val expected = data.toDF()
+        checkAnswer(actual, expected)
+      }
+    }
+  }
+
+  test("SPARK-15198 Support for pushing down filters for boolean types") {
+    withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+      val data = (0 until 10).map(_ => (true, false))
+      withOrcFile(data) { file =>
+        val df = spark.read.orc(file).where("_2 == true")
+        val actual = stripSparkFilter(df).count()
+
+        // ORC filter should be applied and the total count should be 0.
+        assert(actual === 0)
+      }
+    }
+  }
+
+  test("Support for pushing down filters for decimal types") {
+    withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+      val data = (0 until 10).map(i => Tuple1(BigDecimal.valueOf(i)))
+      withTempPath { file =>
+        // It needs to repartition data so that we can have several ORC files
+        // in order to skip stripes in ORC.
+        spark.createDataFrame(data).toDF("a").repartition(10)
+          .write.orc(file.getCanonicalPath)
+        val df = spark.read.orc(file.getCanonicalPath).where("a == 2")
+        val actual = stripSparkFilter(df).count()
+
+        assert(actual < 10)
+      }
+    }
+  }
+
+  test("Support for pushing down filters for timestamp types") {
+    withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+      val timeString = "2015-08-20 14:57:00"
+      val data = (0 until 10).map { i =>
+        val milliseconds = Timestamp.valueOf(timeString).getTime + i * 3600
+        Tuple1(new Timestamp(milliseconds))
+      }
+      withTempPath { file =>
+        // It needs to repartition data so that we can have several ORC files
+        // in order to skip stripes in ORC.
+        spark.createDataFrame(data).toDF("a").repartition(10)
+          .write.orc(file.getCanonicalPath)
+        val df = spark.read.orc(file.getCanonicalPath).where(s"a == 
'$timeString'")
+        val actual = stripSparkFilter(df).count()
+
+        assert(actual < 10)
+      }
+    }
+  }
+
+  test("column nullability and comment - write and then read") {
+    val schema = (new StructType)
+      .add("cl1", IntegerType, nullable = false, comment = "test")
+      .add("cl2", IntegerType, nullable = true)
+      .add("cl3", IntegerType, nullable = true)
+    val row = Row(3, null, 4)
+    val df = spark.createDataFrame(sparkContext.parallelize(row :: Nil), 
schema)
+
+    val tableName = "tab"
+    withTable(tableName) {
+      df.write.format("orc").mode("overwrite").saveAsTable(tableName)
+      // Verify the DDL command result: DESCRIBE TABLE
+      checkAnswer(
+        sql(s"desc $tableName").select("col_name", "comment").where($"comment" 
=== "test"),
+        Row("cl1", "test") :: Nil)
+      // Verify the schema
+      val expectedFields = schema.fields.map(f => f.copy(nullable = true))
+      assert(spark.table(tableName).schema == schema.copy(fields = 
expectedFields))
+    }
+  }
+
+  test("Empty schema does not read data from ORC file") {
+    val data = Seq((1, 1), (2, 2))
+    withOrcFile(data) { path =>
+      val conf = new Configuration()
+      conf.set(OrcConf.INCLUDE_COLUMNS.getAttribute, "")
+      conf.setBoolean("hive.io.file.read.all.columns", false)
+
+      val orcRecordReader = {
+        val file = new 
File(path).listFiles().find(_.getName.endsWith(".snappy.orc")).head
+        val split = new FileSplit(new Path(file.toURI), 0, file.length, 
Array.empty[String])
+        val attemptId = new TaskAttemptID(new TaskID(new JobID(), 
TaskType.MAP, 0), 0)
+        val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
+        val oif = new OrcInputFormat[OrcStruct]
+        oif.createRecordReader(split, hadoopAttemptContext)
+      }
+
+      val recordsIterator = new 
RecordReaderIterator[OrcStruct](orcRecordReader)
+      try {
+        assert(recordsIterator.next().toString == "{null, null}")
+      } finally {
+        recordsIterator.close()
+      }
+    }
+  }
+
+  test("read from multiple orc input paths") {
+    val path1 = Utils.createTempDir()
+    val path2 = Utils.createTempDir()
+    makeOrcFile((1 to 10).map(Tuple1.apply), path1)
+    makeOrcFile((1 to 10).map(Tuple1.apply), path2)
+    val df = spark.read.orc(path1.getCanonicalPath, path2.getCanonicalPath)
+    assert(df.count() == 20)
+  }
+}
+
+class OrcQuerySuite extends OrcQueryTest with SharedSQLContext {
+  import testImplicits._
+
+  test("LZO compression options for writing to an ORC file") {
+    withTempPath { file =>
+      spark.range(0, 10).write
+        .option("compression", "LZO")
+        .orc(file.getCanonicalPath)
+
+      val maybeOrcFile = file.listFiles().find(_.getName.endsWith(".lzo.orc"))
+      assert(maybeOrcFile.isDefined)
+
+      val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath)
+      val conf = OrcFile.readerOptions(new Configuration())
+      assert("LZO" === OrcFile.createReader(orcFilePath, 
conf).getCompressionKind.name)
+    }
+  }
+
+  test("Schema discovery on empty ORC files") {
+    // SPARK-8501 is fixed.
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+
+      withTable("empty_orc") {
+        withTempView("empty", "single") {
+          spark.sql(
+            s"""CREATE TABLE empty_orc(key INT, value STRING)
+               |USING ORC
+               |LOCATION '${dir.toURI}'
+             """.stripMargin)
+
+          val emptyDF = Seq.empty[(Int, String)].toDF("key", 
"value").coalesce(1)
+          emptyDF.createOrReplaceTempView("empty")
+
+          // This creates 1 empty ORC file with ORC SerDe.  We are using this 
trick because
+          // Spark SQL ORC data source always avoids write empty ORC files.
+          spark.sql(
+            s"""INSERT INTO TABLE empty_orc
+               |SELECT key, value FROM empty
+             """.stripMargin)
+
+          val df = spark.read.orc(path)
+          assert(df.schema === emptyDF.schema.asNullable)
+          checkAnswer(df, emptyDF)
+        }
+      }
+    }
+  }
+
+  test("SPARK-21791 ORC should support column names with dot") {
+    withTempDir { dir =>
+      val path = new File(dir, "orc").getCanonicalPath
+      Seq(Some(1), None).toDF("col.dots").write.orc(path)
+      assert(spark.read.orc(path).collect().length == 2)
+    }
+  }
+
+  test("SPARK-20728 Make ORCFileFormat configurable between sql/hive and 
sql/core") {
+    withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> "hive") {
+      val e = intercept[AnalysisException] {
+        sql("CREATE TABLE spark_20728(a INT) USING ORC")
+      }
+      assert(e.message.contains("Hive built-in ORC data source must be used 
with Hive support"))
+    }
+
+    withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> "native") {
+      withTable("spark_20728") {
+        sql("CREATE TABLE spark_20728(a INT) USING ORC")
+        val fileFormat = sql("SELECT * FROM 
spark_20728").queryExecution.analyzed.collectFirst {
+          case l: LogicalRelation => 
l.relation.asInstanceOf[HadoopFsRelation].fileFormat.getClass
+        }
+        assert(fileFormat == Some(classOf[OrcFileFormat]))
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c1e5688d/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
new file mode 100644
index 0000000..6f5f2fd
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc
+
+import java.io.File
+import java.util.Locale
+
+import org.apache.orc.OrcConf.COMPRESS
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.util.Utils
+
+case class OrcData(intField: Int, stringField: String)
+
+abstract class OrcSuite extends OrcTest with BeforeAndAfterAll {
+  import testImplicits._
+
+  var orcTableDir: File = null
+  var orcTableAsDir: File = null
+
+  protected override def beforeAll(): Unit = {
+    super.beforeAll()
+
+    orcTableAsDir = Utils.createTempDir("orctests", "sparksql")
+    orcTableDir = Utils.createTempDir("orctests", "sparksql")
+
+    sparkContext
+      .makeRDD(1 to 10)
+      .map(i => OrcData(i, s"part-$i"))
+      .toDF()
+      .createOrReplaceTempView("orc_temp_table")
+  }
+
+  test("create temporary orc table") {
+    checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(10))
+
+    checkAnswer(
+      sql("SELECT * FROM normal_orc_source"),
+      (1 to 10).map(i => Row(i, s"part-$i")))
+
+    checkAnswer(
+      sql("SELECT * FROM normal_orc_source where intField > 5"),
+      (6 to 10).map(i => Row(i, s"part-$i")))
+
+    checkAnswer(
+      sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY 
stringField"),
+      (1 to 10).map(i => Row(1, s"part-$i")))
+  }
+
+  test("create temporary orc table as") {
+    checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(10))
+
+    checkAnswer(
+      sql("SELECT * FROM normal_orc_source"),
+      (1 to 10).map(i => Row(i, s"part-$i")))
+
+    checkAnswer(
+      sql("SELECT * FROM normal_orc_source WHERE intField > 5"),
+      (6 to 10).map(i => Row(i, s"part-$i")))
+
+    checkAnswer(
+      sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY 
stringField"),
+      (1 to 10).map(i => Row(1, s"part-$i")))
+  }
+
+  test("appending insert") {
+    sql("INSERT INTO TABLE normal_orc_source SELECT * FROM orc_temp_table 
WHERE intField > 5")
+
+    checkAnswer(
+      sql("SELECT * FROM normal_orc_source"),
+      (1 to 5).map(i => Row(i, s"part-$i")) ++ (6 to 10).flatMap { i =>
+        Seq.fill(2)(Row(i, s"part-$i"))
+      })
+  }
+
+  test("overwrite insert") {
+    sql(
+      """INSERT OVERWRITE TABLE normal_orc_as_source
+        |SELECT * FROM orc_temp_table WHERE intField > 5
+      """.stripMargin)
+
+    checkAnswer(
+      sql("SELECT * FROM normal_orc_as_source"),
+      (6 to 10).map(i => Row(i, s"part-$i")))
+  }
+
+  test("write null values") {
+    sql("DROP TABLE IF EXISTS orcNullValues")
+
+    val df = sql(
+      """
+        |SELECT
+        |  CAST(null as TINYINT) as c0,
+        |  CAST(null as SMALLINT) as c1,
+        |  CAST(null as INT) as c2,
+        |  CAST(null as BIGINT) as c3,
+        |  CAST(null as FLOAT) as c4,
+        |  CAST(null as DOUBLE) as c5,
+        |  CAST(null as DECIMAL(7,2)) as c6,
+        |  CAST(null as TIMESTAMP) as c7,
+        |  CAST(null as DATE) as c8,
+        |  CAST(null as STRING) as c9,
+        |  CAST(null as VARCHAR(10)) as c10
+        |FROM orc_temp_table limit 1
+      """.stripMargin)
+
+    df.write.format("orc").saveAsTable("orcNullValues")
+
+    checkAnswer(
+      sql("SELECT * FROM orcNullValues"),
+      Row.fromSeq(Seq.fill(11)(null)))
+
+    sql("DROP TABLE IF EXISTS orcNullValues")
+  }
+
+  test("SPARK-18433: Improve DataSource option keys to be more 
case-insensitive") {
+    val conf = spark.sessionState.conf
+    val option = new 
OrcOptions(Map(COMPRESS.getAttribute.toUpperCase(Locale.ROOT) -> "NONE"), conf)
+    assert(option.compressionCodec == "NONE")
+  }
+
+  test("SPARK-21839: Add SQL config for ORC compression") {
+    val conf = spark.sessionState.conf
+    // Test if the default of spark.sql.orc.compression.codec is snappy
+    assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == 
"SNAPPY")
+
+    // OrcOptions's parameters have a higher priority than SQL configuration.
+    // `compression` -> `orc.compression` -> `spark.sql.orc.compression.codec`
+    withSQLConf(SQLConf.ORC_COMPRESSION.key -> "uncompressed") {
+      assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec 
== "NONE")
+      val map1 = Map(COMPRESS.getAttribute -> "zlib")
+      val map2 = Map(COMPRESS.getAttribute -> "zlib", "compression" -> "lzo")
+      assert(new OrcOptions(map1, conf).compressionCodec == "ZLIB")
+      assert(new OrcOptions(map2, conf).compressionCodec == "LZO")
+    }
+
+    // Test all the valid options of spark.sql.orc.compression.codec
+    Seq("NONE", "UNCOMPRESSED", "SNAPPY", "ZLIB", "LZO").foreach { c =>
+      withSQLConf(SQLConf.ORC_COMPRESSION.key -> c) {
+        val expected = if (c == "UNCOMPRESSED") "NONE" else c
+        assert(new OrcOptions(Map.empty[String, String], 
conf).compressionCodec == expected)
+      }
+    }
+  }
+}
+
+class OrcSourceSuite extends OrcSuite with SharedSQLContext {
+
+  protected override def beforeAll(): Unit = {
+    super.beforeAll()
+
+    sql(
+      s"""CREATE TABLE normal_orc(
+         |  intField INT,
+         |  stringField STRING
+         |)
+         |USING ORC
+         |LOCATION '${orcTableAsDir.toURI}'
+       """.stripMargin)
+
+    sql(
+      s"""INSERT INTO TABLE normal_orc
+         |SELECT intField, stringField FROM orc_temp_table
+       """.stripMargin)
+
+    spark.sql(
+      s"""CREATE TEMPORARY VIEW normal_orc_source
+         |USING ORC
+         |OPTIONS (
+         |  PATH '${new File(orcTableAsDir.getAbsolutePath).toURI}'
+         |)
+       """.stripMargin)
+
+    spark.sql(
+      s"""CREATE TEMPORARY VIEW normal_orc_as_source
+         |USING ORC
+         |OPTIONS (
+         |  PATH '${new File(orcTableAsDir.getAbsolutePath).toURI}'
+         |)
+       """.stripMargin)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c1e5688d/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
new file mode 100644
index 0000000..d94cb85
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc
+
+import java.io.File
+
+import scala.reflect.ClassTag
+import scala.reflect.runtime.universe.TypeTag
+
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.internal.SQLConf.ORC_IMPLEMENTATION
+import org.apache.spark.sql.test.SQLTestUtils
+
+/**
+ * OrcTest
+ *   -> OrcSuite
+ *       -> OrcSourceSuite
+ *       -> HiveOrcSourceSuite
+ *   -> OrcQueryTests
+ *       -> OrcQuerySuite
+ *       -> HiveOrcQuerySuite
+ *   -> OrcPartitionDiscoveryTest
+ *       -> OrcPartitionDiscoverySuite
+ *       -> HiveOrcPartitionDiscoverySuite
+ *   -> OrcFilterSuite
+ *   -> HiveOrcFilterSuite
+ */
+abstract class OrcTest extends QueryTest with SQLTestUtils with 
BeforeAndAfterAll {
+  import testImplicits._
+
+  val orcImp: String = "native"
+
+  private var originalConfORCImplementation = "native"
+
+  protected override def beforeAll(): Unit = {
+    super.beforeAll()
+    originalConfORCImplementation = conf.getConf(ORC_IMPLEMENTATION)
+    conf.setConf(ORC_IMPLEMENTATION, orcImp)
+  }
+
+  protected override def afterAll(): Unit = {
+    conf.setConf(ORC_IMPLEMENTATION, originalConfORCImplementation)
+    super.afterAll()
+  }
+
+  /**
+   * Writes `data` to a Orc file, which is then passed to `f` and will be 
deleted after `f`
+   * returns.
+   */
+  protected def withOrcFile[T <: Product: ClassTag: TypeTag]
+      (data: Seq[T])
+      (f: String => Unit): Unit = {
+    withTempPath { file =>
+      sparkContext.parallelize(data).toDF().write.orc(file.getCanonicalPath)
+      f(file.getCanonicalPath)
+    }
+  }
+
+  /**
+   * Writes `data` to a Orc file and reads it back as a `DataFrame`,
+   * which is then passed to `f`. The Orc file will be deleted after `f` 
returns.
+   */
+  protected def withOrcDataFrame[T <: Product: ClassTag: TypeTag]
+      (data: Seq[T])
+      (f: DataFrame => Unit): Unit = {
+    withOrcFile(data)(path => f(spark.read.orc(path)))
+  }
+
+  /**
+   * Writes `data` to a Orc file, reads it back as a `DataFrame` and registers 
it as a
+   * temporary table named `tableName`, then call `f`. The temporary table 
together with the
+   * Orc file will be dropped/deleted after `f` returns.
+   */
+  protected def withOrcTable[T <: Product: ClassTag: TypeTag]
+      (data: Seq[T], tableName: String)
+      (f: => Unit): Unit = {
+    withOrcDataFrame(data) { df =>
+      df.createOrReplaceTempView(tableName)
+      withTempView(tableName)(f)
+    }
+  }
+
+  protected def makeOrcFile[T <: Product: ClassTag: TypeTag](
+      data: Seq[T], path: File): Unit = {
+    data.toDF().write.mode(SaveMode.Overwrite).orc(path.getCanonicalPath)
+  }
+
+  protected def makeOrcFile[T <: Product: ClassTag: TypeTag](
+      df: DataFrame, path: File): Unit = {
+    df.write.mode(SaveMode.Overwrite).orc(path.getCanonicalPath)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c1e5688d/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala
new file mode 100644
index 0000000..283037c
--- /dev/null
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.orc
+
+import java.nio.charset.StandardCharsets
+import java.sql.{Date, Timestamp}
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.hive.ql.io.sarg.{PredicateLeaf, SearchArgument}
+
+import org.apache.spark.sql.{Column, DataFrame}
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, 
HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.orc.OrcTest
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.types._
+
+/**
+ * A test suite that tests Hive ORC filter API based filter pushdown 
optimization.
+ */
+class HiveOrcFilterSuite extends OrcTest with TestHiveSingleton {
+
+  override val orcImp: String = "hive"
+
+  private def checkFilterPredicate(
+      df: DataFrame,
+      predicate: Predicate,
+      checker: (SearchArgument) => Unit): Unit = {
+    val output = predicate.collect { case a: Attribute => a }.distinct
+    val query = df
+      .select(output.map(e => Column(e)): _*)
+      .where(Column(predicate))
+
+    var maybeRelation: Option[HadoopFsRelation] = None
+    val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect {
+      case PhysicalOperation(_, filters, LogicalRelation(orcRelation: 
HadoopFsRelation, _, _, _)) =>
+        maybeRelation = Some(orcRelation)
+        filters
+    }.flatten.reduceLeftOption(_ && _)
+    assert(maybeAnalyzedPredicate.isDefined, "No filter is analyzed from the 
given query")
+
+    val (_, selectedFilters, _) =
+      DataSourceStrategy.selectFilters(maybeRelation.get, 
maybeAnalyzedPredicate.toSeq)
+    assert(selectedFilters.nonEmpty, "No filter is pushed down")
+
+    val maybeFilter = OrcFilters.createFilter(query.schema, 
selectedFilters.toArray)
+    assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for 
$selectedFilters")
+    checker(maybeFilter.get)
+  }
+
+  private def checkFilterPredicate
+      (predicate: Predicate, filterOperator: PredicateLeaf.Operator)
+      (implicit df: DataFrame): Unit = {
+    def checkComparisonOperator(filter: SearchArgument) = {
+      val operator = filter.getLeaves.asScala
+      assert(operator.map(_.getOperator).contains(filterOperator))
+    }
+    checkFilterPredicate(df, predicate, checkComparisonOperator)
+  }
+
+  private def checkFilterPredicate
+      (predicate: Predicate, stringExpr: String)
+      (implicit df: DataFrame): Unit = {
+    def checkLogicalOperator(filter: SearchArgument) = {
+      assert(filter.toString == stringExpr)
+    }
+    checkFilterPredicate(df, predicate, checkLogicalOperator)
+  }
+
+  private def checkNoFilterPredicate
+      (predicate: Predicate)
+      (implicit df: DataFrame): Unit = {
+    val output = predicate.collect { case a: Attribute => a }.distinct
+    val query = df
+      .select(output.map(e => Column(e)): _*)
+      .where(Column(predicate))
+
+    var maybeRelation: Option[HadoopFsRelation] = None
+    val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect {
+      case PhysicalOperation(_, filters, LogicalRelation(orcRelation: 
HadoopFsRelation, _, _, _)) =>
+        maybeRelation = Some(orcRelation)
+        filters
+    }.flatten.reduceLeftOption(_ && _)
+    assert(maybeAnalyzedPredicate.isDefined, "No filter is analyzed from the 
given query")
+
+    val (_, selectedFilters, _) =
+      DataSourceStrategy.selectFilters(maybeRelation.get, 
maybeAnalyzedPredicate.toSeq)
+    assert(selectedFilters.nonEmpty, "No filter is pushed down")
+
+    val maybeFilter = OrcFilters.createFilter(query.schema, 
selectedFilters.toArray)
+    assert(maybeFilter.isEmpty, s"Could generate filter predicate for 
$selectedFilters")
+  }
+
+  test("filter pushdown - integer") {
+    withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { implicit df =>
+      checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
+
+      checkFilterPredicate('_1 === 1, PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate('_1 <=> 1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+      checkFilterPredicate('_1 < 2, PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate('_1 > 3, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 <= 1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 >= 4, PredicateLeaf.Operator.LESS_THAN)
+
+      checkFilterPredicate(Literal(1) === '_1, PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate(Literal(1) <=> '_1, 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+      checkFilterPredicate(Literal(2) > '_1, PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate(Literal(3) < '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal(1) >= '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal(4) <= '_1, PredicateLeaf.Operator.LESS_THAN)
+    }
+  }
+
+  test("filter pushdown - long") {
+    withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i.toLong)))) { implicit 
df =>
+      checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
+
+      checkFilterPredicate('_1 === 1, PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate('_1 <=> 1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+      checkFilterPredicate('_1 < 2, PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate('_1 > 3, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 <= 1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 >= 4, PredicateLeaf.Operator.LESS_THAN)
+
+      checkFilterPredicate(Literal(1) === '_1, PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate(Literal(1) <=> '_1, 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+      checkFilterPredicate(Literal(2) > '_1, PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate(Literal(3) < '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal(1) >= '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal(4) <= '_1, PredicateLeaf.Operator.LESS_THAN)
+    }
+  }
+
+  test("filter pushdown - float") {
+    withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i.toFloat)))) { implicit 
df =>
+      checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
+
+      checkFilterPredicate('_1 === 1, PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate('_1 <=> 1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+      checkFilterPredicate('_1 < 2, PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate('_1 > 3, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 <= 1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 >= 4, PredicateLeaf.Operator.LESS_THAN)
+
+      checkFilterPredicate(Literal(1) === '_1, PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate(Literal(1) <=> '_1, 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+      checkFilterPredicate(Literal(2) > '_1, PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate(Literal(3) < '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal(1) >= '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal(4) <= '_1, PredicateLeaf.Operator.LESS_THAN)
+    }
+  }
+
+  test("filter pushdown - double") {
+    withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i.toDouble)))) { implicit 
df =>
+      checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
+
+      checkFilterPredicate('_1 === 1, PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate('_1 <=> 1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+      checkFilterPredicate('_1 < 2, PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate('_1 > 3, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 <= 1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 >= 4, PredicateLeaf.Operator.LESS_THAN)
+
+      checkFilterPredicate(Literal(1) === '_1, PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate(Literal(1) <=> '_1, 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+      checkFilterPredicate(Literal(2) > '_1, PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate(Literal(3) < '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal(1) >= '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal(4) <= '_1, PredicateLeaf.Operator.LESS_THAN)
+    }
+  }
+
+  test("filter pushdown - string") {
+    withOrcDataFrame((1 to 4).map(i => Tuple1(i.toString))) { implicit df =>
+      checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
+
+      checkFilterPredicate('_1 === "1", PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate('_1 <=> "1", 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+      checkFilterPredicate('_1 < "2", PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate('_1 > "3", PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 <= "1", PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 >= "4", PredicateLeaf.Operator.LESS_THAN)
+
+      checkFilterPredicate(Literal("1") === '_1, PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate(Literal("1") <=> '_1, 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+      checkFilterPredicate(Literal("2") > '_1, 
PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate(Literal("3") < '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal("1") >= '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal("4") <= '_1, 
PredicateLeaf.Operator.LESS_THAN)
+    }
+  }
+
+  test("filter pushdown - boolean") {
+    withOrcDataFrame((true :: false :: Nil).map(b => Tuple1.apply(Option(b)))) 
{ implicit df =>
+      checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
+
+      checkFilterPredicate('_1 === true, PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate('_1 <=> true, 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+      checkFilterPredicate('_1 < true, PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate('_1 > false, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 <= false, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 >= false, PredicateLeaf.Operator.LESS_THAN)
+
+      checkFilterPredicate(Literal(false) === '_1, 
PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate(Literal(false) <=> '_1, 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+      checkFilterPredicate(Literal(false) > '_1, 
PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate(Literal(true) < '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal(true) >= '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal(true) <= '_1, 
PredicateLeaf.Operator.LESS_THAN)
+    }
+  }
+
+  test("filter pushdown - decimal") {
+    withOrcDataFrame((1 to 4).map(i => Tuple1.apply(BigDecimal.valueOf(i)))) { 
implicit df =>
+      checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
+
+      checkFilterPredicate('_1 === BigDecimal.valueOf(1), 
PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate('_1 <=> BigDecimal.valueOf(1), 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+      checkFilterPredicate('_1 < BigDecimal.valueOf(2), 
PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate('_1 > BigDecimal.valueOf(3), 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 <= BigDecimal.valueOf(1), 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 >= BigDecimal.valueOf(4), 
PredicateLeaf.Operator.LESS_THAN)
+
+      checkFilterPredicate(
+        Literal(BigDecimal.valueOf(1)) === '_1, PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate(
+        Literal(BigDecimal.valueOf(1)) <=> '_1, 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+      checkFilterPredicate(
+        Literal(BigDecimal.valueOf(2)) > '_1, PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate(
+        Literal(BigDecimal.valueOf(3)) < '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(
+        Literal(BigDecimal.valueOf(1)) >= '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(
+        Literal(BigDecimal.valueOf(4)) <= '_1, 
PredicateLeaf.Operator.LESS_THAN)
+    }
+  }
+
+  test("filter pushdown - timestamp") {
+    val timeString = "2015-08-20 14:57:00"
+    val timestamps = (1 to 4).map { i =>
+      val milliseconds = Timestamp.valueOf(timeString).getTime + i * 3600
+      new Timestamp(milliseconds)
+    }
+    withOrcDataFrame(timestamps.map(Tuple1(_))) { implicit df =>
+      checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
+
+      checkFilterPredicate('_1 === timestamps(0), 
PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate('_1 <=> timestamps(0), 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+      checkFilterPredicate('_1 < timestamps(1), 
PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate('_1 > timestamps(2), 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 <= timestamps(0), 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 >= timestamps(3), 
PredicateLeaf.Operator.LESS_THAN)
+
+      checkFilterPredicate(Literal(timestamps(0)) === '_1, 
PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate(Literal(timestamps(0)) <=> '_1, 
PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+      checkFilterPredicate(Literal(timestamps(1)) > '_1, 
PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate(Literal(timestamps(2)) < '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal(timestamps(0)) >= '_1, 
PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal(timestamps(3)) <= '_1, 
PredicateLeaf.Operator.LESS_THAN)
+    }
+  }
+
+  test("filter pushdown - combinations with logical operators") {
+    withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { implicit df =>
+      // Because `ExpressionTree` is not accessible at Hive 1.2.x, this should 
be checked
+      // in string form in order to check filter creation including logical 
operators
+      // such as `and`, `or` or `not`. So, this function uses 
`SearchArgument.toString()`
+      // to produce string expression and then compare it to given string 
expression below.
+      // This might have to be changed after Hive version is upgraded.
+      checkFilterPredicate(
+        '_1.isNotNull,
+        """leaf-0 = (IS_NULL _1)
+          |expr = (not leaf-0)""".stripMargin.trim
+      )
+      checkFilterPredicate(
+        '_1 =!= 1,
+        """leaf-0 = (IS_NULL _1)
+          |leaf-1 = (EQUALS _1 1)
+          |expr = (and (not leaf-0) (not leaf-1))""".stripMargin.trim
+      )
+      checkFilterPredicate(
+        !('_1 < 4),
+        """leaf-0 = (IS_NULL _1)
+          |leaf-1 = (LESS_THAN _1 4)
+          |expr = (and (not leaf-0) (not leaf-1))""".stripMargin.trim
+      )
+      checkFilterPredicate(
+        '_1 < 2 || '_1 > 3,
+        """leaf-0 = (LESS_THAN _1 2)
+          |leaf-1 = (LESS_THAN_EQUALS _1 3)
+          |expr = (or leaf-0 (not leaf-1))""".stripMargin.trim
+      )
+      checkFilterPredicate(
+        '_1 < 2 && '_1 > 3,
+        """leaf-0 = (IS_NULL _1)
+          |leaf-1 = (LESS_THAN _1 2)
+          |leaf-2 = (LESS_THAN_EQUALS _1 3)
+          |expr = (and (not leaf-0) leaf-1 (not leaf-2))""".stripMargin.trim
+      )
+    }
+  }
+
+  test("no filter pushdown - non-supported types") {
+    implicit class IntToBinary(int: Int) {
+      def b: Array[Byte] = int.toString.getBytes(StandardCharsets.UTF_8)
+    }
+    // ArrayType
+    withOrcDataFrame((1 to 4).map(i => Tuple1(Array(i)))) { implicit df =>
+      checkNoFilterPredicate('_1.isNull)
+    }
+    // BinaryType
+    withOrcDataFrame((1 to 4).map(i => Tuple1(i.b))) { implicit df =>
+      checkNoFilterPredicate('_1 <=> 1.b)
+    }
+    // DateType
+    val stringDate = "2015-01-01"
+    withOrcDataFrame(Seq(Tuple1(Date.valueOf(stringDate)))) { implicit df =>
+      checkNoFilterPredicate('_1 === Date.valueOf(stringDate))
+    }
+    // MapType
+    withOrcDataFrame((1 to 4).map(i => Tuple1(Map(i -> i)))) { implicit df =>
+      checkNoFilterPredicate('_1.isNotNull)
+    }
+  }
+
+  test("SPARK-12218 Converting conjunctions into ORC SearchArguments") {
+    import org.apache.spark.sql.sources._
+    // The `LessThan` should be converted while the `StringContains` shouldn't
+    val schema = new StructType(
+      Array(
+        StructField("a", IntegerType, nullable = true),
+        StructField("b", StringType, nullable = true)))
+    assertResult(
+      """leaf-0 = (LESS_THAN a 10)
+        |expr = leaf-0
+      """.stripMargin.trim
+    ) {
+      OrcFilters.createFilter(schema, Array(
+        LessThan("a", 10),
+        StringContains("b", "prefix")
+      )).get.toString
+    }
+
+    // The `LessThan` should be converted while the whole inner `And` shouldn't
+    assertResult(
+      """leaf-0 = (LESS_THAN a 10)
+        |expr = leaf-0
+      """.stripMargin.trim
+    ) {
+      OrcFilters.createFilter(schema, Array(
+        LessThan("a", 10),
+        Not(And(
+          GreaterThan("a", 1),
+          StringContains("b", "prefix")
+        ))
+      )).get.toString
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c1e5688d/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcPartitionDiscoverySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcPartitionDiscoverySuite.scala
new file mode 100644
index 0000000..ab9b492
--- /dev/null
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcPartitionDiscoverySuite.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.orc
+
+import org.apache.spark.sql.execution.datasources.orc.OrcPartitionDiscoveryTest
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+
+class HiveOrcPartitionDiscoverySuite extends OrcPartitionDiscoveryTest with 
TestHiveSingleton  {
+  override val orcImp: String = "hive"
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to