Repository: spark
Updated Branches:
  refs/heads/master c19152cd2 -> a9ed51178


http://git-wip-us.apache.org/repos/asf/spark/blob/a9ed5117/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
index ff91a0e..f8117c2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
@@ -22,8 +22,10 @@ import parquet.filter2.predicate.{FilterPredicate, Operators}
 
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, 
Predicate, Row}
-import org.apache.spark.sql.types._
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.sources.LogicalRelation
 import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.{Column, DataFrame, QueryTest, SQLConf}
 
 /**
@@ -54,9 +56,17 @@ class ParquetFilterSuite extends QueryTest with ParquetTest {
         .select(output.map(e => Column(e)): _*)
         .where(Column(predicate))
 
-      val maybeAnalyzedPredicate = query.queryExecution.executedPlan.collect {
-        case plan: ParquetTableScan => plan.columnPruningPred
-      }.flatten.reduceOption(_ && _)
+      val maybeAnalyzedPredicate = {
+        val forParquetTableScan = query.queryExecution.executedPlan.collect {
+          case plan: ParquetTableScan => plan.columnPruningPred
+        }.flatten.reduceOption(_ && _)
+
+        val forParquetDataSource = query.queryExecution.optimizedPlan.collect {
+          case PhysicalOperation(_, filters, LogicalRelation(_: 
ParquetRelation2)) => filters
+        }.flatten.reduceOption(_ && _)
+
+        forParquetTableScan.orElse(forParquetDataSource)
+      }
 
       assert(maybeAnalyzedPredicate.isDefined)
       maybeAnalyzedPredicate.foreach { pred =>
@@ -84,213 +94,228 @@ class ParquetFilterSuite extends QueryTest with 
ParquetTest {
     checkFilterPredicate(predicate, filterClass, Seq(Row(expected)))(rdd)
   }
 
-  test("filter pushdown - boolean") {
-    withParquetRDD((true :: false :: Nil).map(b => Tuple1.apply(Option(b)))) { 
implicit rdd =>
-      checkFilterPredicate('_1.isNull,    classOf[Eq   [_]], Seq.empty[Row])
-      checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], Seq(Row(true), 
Row(false)))
-
-      checkFilterPredicate('_1 === true, classOf[Eq   [_]], true)
-      checkFilterPredicate('_1 !== true, classOf[NotEq[_]], false)
+  private def checkBinaryFilterPredicate
+      (predicate: Predicate, filterClass: Class[_ <: FilterPredicate], 
expected: Seq[Row])
+      (implicit rdd: DataFrame): Unit = {
+    def checkBinaryAnswer(rdd: DataFrame, expected: Seq[Row]) = {
+      
assertResult(expected.map(_.getAs[Array[Byte]](0).mkString(",")).toSeq.sorted) {
+        rdd.map(_.getAs[Array[Byte]](0).mkString(",")).collect().toSeq.sorted
+      }
     }
+
+    checkFilterPredicate(rdd, predicate, filterClass, checkBinaryAnswer _, 
expected)
   }
 
-  test("filter pushdown - short") {
-    withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toShort)))) { implicit 
rdd =>
-      checkFilterPredicate(Cast('_1, IntegerType) === 1, classOf[Eq   [_]], 1)
-      checkFilterPredicate(Cast('_1, IntegerType) !== 1, classOf[NotEq[_]], (2 
to 4).map(Row.apply(_)))
-      
-      checkFilterPredicate(Cast('_1, IntegerType) < 2,  classOf[Lt  [_]], 1)
-      checkFilterPredicate(Cast('_1, IntegerType) > 3,  classOf[Gt  [_]], 4)
-      checkFilterPredicate(Cast('_1, IntegerType) <= 1, classOf[LtEq[_]], 1)
-      checkFilterPredicate(Cast('_1, IntegerType) >= 4, classOf[GtEq[_]], 4)
-      
-      checkFilterPredicate(Literal(1) === Cast('_1, IntegerType), classOf[Eq  
[_]], 1)
-      checkFilterPredicate(Literal(2) >   Cast('_1, IntegerType), classOf[Lt  
[_]], 1)
-      checkFilterPredicate(Literal(3) <   Cast('_1, IntegerType), classOf[Gt  
[_]], 4)
-      checkFilterPredicate(Literal(1) >=  Cast('_1, IntegerType), 
classOf[LtEq[_]], 1)
-      checkFilterPredicate(Literal(4) <=  Cast('_1, IntegerType), 
classOf[GtEq[_]], 4)
-      
-      checkFilterPredicate(!(Cast('_1, IntegerType) < 4), classOf[GtEq[_]], 4)
-      checkFilterPredicate(Cast('_1, IntegerType) > 2 && Cast('_1, 
IntegerType) < 4, 
-        classOf[Operators.And], 3)
-      checkFilterPredicate(Cast('_1, IntegerType) < 2 || Cast('_1, 
IntegerType) > 3, 
-        classOf[Operators.Or],  Seq(Row(1), Row(4)))
-    }
+  private def checkBinaryFilterPredicate
+      (predicate: Predicate, filterClass: Class[_ <: FilterPredicate], 
expected: Array[Byte])
+      (implicit rdd: DataFrame): Unit = {
+    checkBinaryFilterPredicate(predicate, filterClass, Seq(Row(expected)))(rdd)
   }
 
-  test("filter pushdown - integer") {
-    withParquetRDD((1 to 4).map(i => Tuple1(Option(i)))) { implicit rdd =>
-      checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
-      checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 
4).map(Row.apply(_)))
+  def run(prefix: String): Unit = {
+    test(s"$prefix: filter pushdown - boolean") {
+      withParquetRDD((true :: false :: Nil).map(b => Tuple1.apply(Option(b)))) 
{ implicit rdd =>
+        checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
+        checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], Seq(Row(true), 
Row(false)))
+
+        checkFilterPredicate('_1 === true, classOf[Eq[_]], true)
+        checkFilterPredicate('_1 !== true, classOf[NotEq[_]], false)
+      }
+    }
+
+    test(s"$prefix: filter pushdown - short") {
+      withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toShort)))) { implicit 
rdd =>
+        checkFilterPredicate(Cast('_1, IntegerType) === 1, classOf[Eq[_]], 1)
+        checkFilterPredicate(
+          Cast('_1, IntegerType) !== 1, classOf[NotEq[_]], (2 to 
4).map(Row.apply(_)))
+
+        checkFilterPredicate(Cast('_1, IntegerType) < 2, classOf[Lt[_]], 1)
+        checkFilterPredicate(Cast('_1, IntegerType) > 3, classOf[Gt[_]], 4)
+        checkFilterPredicate(Cast('_1, IntegerType) <= 1, classOf[LtEq[_]], 1)
+        checkFilterPredicate(Cast('_1, IntegerType) >= 4, classOf[GtEq[_]], 4)
+
+        checkFilterPredicate(Literal(1) === Cast('_1, IntegerType), 
classOf[Eq[_]], 1)
+        checkFilterPredicate(Literal(2) > Cast('_1, IntegerType), 
classOf[Lt[_]], 1)
+        checkFilterPredicate(Literal(3) < Cast('_1, IntegerType), 
classOf[Gt[_]], 4)
+        checkFilterPredicate(Literal(1) >= Cast('_1, IntegerType), 
classOf[LtEq[_]], 1)
+        checkFilterPredicate(Literal(4) <= Cast('_1, IntegerType), 
classOf[GtEq[_]], 4)
+
+        checkFilterPredicate(!(Cast('_1, IntegerType) < 4), classOf[GtEq[_]], 
4)
+        checkFilterPredicate(
+          Cast('_1, IntegerType) > 2 && Cast('_1, IntegerType) < 4, 
classOf[Operators.And], 3)
+        checkFilterPredicate(
+          Cast('_1, IntegerType) < 2 || Cast('_1, IntegerType) > 3,
+          classOf[Operators.Or],
+          Seq(Row(1), Row(4)))
+      }
+    }
+
+    test(s"$prefix: filter pushdown - integer") {
+      withParquetRDD((1 to 4).map(i => Tuple1(Option(i)))) { implicit rdd =>
+        checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
+        checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 
4).map(Row.apply(_)))
 
-      checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1)
-      checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 
4).map(Row.apply(_)))
+        checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1)
+        checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 
4).map(Row.apply(_)))
 
-      checkFilterPredicate('_1 < 2, classOf[Lt[_]], 1)
-      checkFilterPredicate('_1 > 3, classOf[Gt[_]], 4)
-      checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1)
-      checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
+        checkFilterPredicate('_1 < 2, classOf[Lt[_]], 1)
+        checkFilterPredicate('_1 > 3, classOf[Gt[_]], 4)
+        checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1)
+        checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
 
-      checkFilterPredicate(Literal(1) === '_1, classOf[Eq  [_]], 1)
-      checkFilterPredicate(Literal(2) > '_1, classOf[Lt  [_]], 1)
-      checkFilterPredicate(Literal(3) < '_1, classOf[Gt  [_]], 4)
-      checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1)
-      checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4)
+        checkFilterPredicate(Literal(1) === '_1, classOf[Eq[_]], 1)
+        checkFilterPredicate(Literal(2) > '_1, classOf[Lt[_]], 1)
+        checkFilterPredicate(Literal(3) < '_1, classOf[Gt[_]], 4)
+        checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1)
+        checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4)
 
-      checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
-      checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
-      checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or],  
Seq(Row(1), Row(4)))
+        checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
+        checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
+        checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], 
Seq(Row(1), Row(4)))
+      }
     }
-  }
 
-  test("filter pushdown - long") {
-    withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toLong)))) { implicit rdd 
=>
-      checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
-      checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 
4).map(Row.apply(_)))
+    test(s"$prefix: filter pushdown - long") {
+      withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toLong)))) { implicit 
rdd =>
+        checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
+        checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 
4).map(Row.apply(_)))
 
-      checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1)
-      checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 
4).map(Row.apply(_)))
+        checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1)
+        checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 
4).map(Row.apply(_)))
 
-      checkFilterPredicate('_1 <  2, classOf[Lt[_]], 1)
-      checkFilterPredicate('_1 >  3, classOf[Gt[_]], 4)
-      checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1)
-      checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
+        checkFilterPredicate('_1 < 2, classOf[Lt[_]], 1)
+        checkFilterPredicate('_1 > 3, classOf[Gt[_]], 4)
+        checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1)
+        checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
 
-      checkFilterPredicate(Literal(1) === '_1, classOf[Eq[_]], 1)
-      checkFilterPredicate(Literal(2) >   '_1, classOf[Lt[_]], 1)
-      checkFilterPredicate(Literal(3) <   '_1, classOf[Gt[_]], 4)
-      checkFilterPredicate(Literal(1) >=  '_1, classOf[LtEq[_]], 1)
-      checkFilterPredicate(Literal(4) <=  '_1, classOf[GtEq[_]], 4)
+        checkFilterPredicate(Literal(1) === '_1, classOf[Eq[_]], 1)
+        checkFilterPredicate(Literal(2) > '_1, classOf[Lt[_]], 1)
+        checkFilterPredicate(Literal(3) < '_1, classOf[Gt[_]], 4)
+        checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1)
+        checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4)
 
-      checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
-      checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
-      checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or],  
Seq(Row(1), Row(4)))
+        checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
+        checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
+        checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], 
Seq(Row(1), Row(4)))
+      }
     }
-  }
 
-  test("filter pushdown - float") {
-    withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toFloat)))) { implicit 
rdd =>
-      checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
-      checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 
4).map(Row.apply(_)))
+    test(s"$prefix: filter pushdown - float") {
+      withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toFloat)))) { implicit 
rdd =>
+        checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
+        checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 
4).map(Row.apply(_)))
 
-      checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1)
-      checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 
4).map(Row.apply(_)))
+        checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1)
+        checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 
4).map(Row.apply(_)))
 
-      checkFilterPredicate('_1 <  2, classOf[Lt[_]], 1)
-      checkFilterPredicate('_1 >  3, classOf[Gt[_]], 4)
-      checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1)
-      checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
+        checkFilterPredicate('_1 < 2, classOf[Lt[_]], 1)
+        checkFilterPredicate('_1 > 3, classOf[Gt[_]], 4)
+        checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1)
+        checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
 
-      checkFilterPredicate(Literal(1) === '_1, classOf[Eq[_]], 1)
-      checkFilterPredicate(Literal(2) > '_1, classOf[Lt[_]], 1)
-      checkFilterPredicate(Literal(3) < '_1, classOf[Gt[_]], 4)
-      checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1)
-      checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4)
+        checkFilterPredicate(Literal(1) === '_1, classOf[Eq[_]], 1)
+        checkFilterPredicate(Literal(2) > '_1, classOf[Lt[_]], 1)
+        checkFilterPredicate(Literal(3) < '_1, classOf[Gt[_]], 4)
+        checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1)
+        checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4)
 
-      checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
-      checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
-      checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or],  
Seq(Row(1), Row(4)))
+        checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
+        checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
+        checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], 
Seq(Row(1), Row(4)))
+      }
     }
-  }
 
-  test("filter pushdown - double") {
-    withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toDouble)))) { implicit 
rdd =>
-      checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
-      checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 
4).map(Row.apply(_)))
+    test(s"$prefix: filter pushdown - double") {
+      withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toDouble)))) { implicit 
rdd =>
+        checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
+        checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 
4).map(Row.apply(_)))
 
-      checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1)
-      checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 
4).map(Row.apply(_)))
+        checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1)
+        checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 
4).map(Row.apply(_)))
 
-      checkFilterPredicate('_1 <  2, classOf[Lt[_]], 1)
-      checkFilterPredicate('_1 >  3, classOf[Gt[_]], 4)
-      checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1)
-      checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
+        checkFilterPredicate('_1 < 2, classOf[Lt[_]], 1)
+        checkFilterPredicate('_1 > 3, classOf[Gt[_]], 4)
+        checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1)
+        checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
 
-      checkFilterPredicate(Literal(1) === '_1, classOf[Eq  [_]], 1)
-      checkFilterPredicate(Literal(2) > '_1, classOf[Lt  [_]], 1)
-      checkFilterPredicate(Literal(3) < '_1, classOf[Gt  [_]], 4)
-      checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1)
-      checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4)
+        checkFilterPredicate(Literal(1) === '_1, classOf[Eq[_]], 1)
+        checkFilterPredicate(Literal(2) > '_1, classOf[Lt[_]], 1)
+        checkFilterPredicate(Literal(3) < '_1, classOf[Gt[_]], 4)
+        checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1)
+        checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4)
 
-      checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
-      checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
-      checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or],  
Seq(Row(1), Row(4)))
+        checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
+        checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
+        checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], 
Seq(Row(1), Row(4)))
+      }
     }
-  }
 
-  test("filter pushdown - string") {
-    withParquetRDD((1 to 4).map(i => Tuple1(i.toString))) { implicit rdd =>
-      checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
-      checkFilterPredicate(
-        '_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(i => 
Row.apply(i.toString)))
-
-      checkFilterPredicate('_1 === "1", classOf[Eq[_]], "1")
-      checkFilterPredicate('_1 !== "1", classOf[NotEq[_]], (2 to 4).map(i => 
Row.apply(i.toString)))
-
-      checkFilterPredicate('_1 <  "2", classOf[Lt[_]], "1")
-      checkFilterPredicate('_1 >  "3", classOf[Gt[_]], "4")
-      checkFilterPredicate('_1 <= "1", classOf[LtEq[_]], "1")
-      checkFilterPredicate('_1 >= "4", classOf[GtEq[_]], "4")
-
-      checkFilterPredicate(Literal("1") === '_1, classOf[Eq[_]], "1")
-      checkFilterPredicate(Literal("2") > '_1, classOf[Lt[_]], "1")
-      checkFilterPredicate(Literal("3") < '_1, classOf[Gt[_]], "4")
-      checkFilterPredicate(Literal("1") >= '_1, classOf[LtEq[_]], "1")
-      checkFilterPredicate(Literal("4") <= '_1, classOf[GtEq[_]], "4")
-
-      checkFilterPredicate(!('_1 < "4"), classOf[GtEq[_]], "4")
-      checkFilterPredicate('_1 > "2" && '_1 < "4", classOf[Operators.And], "3")
-      checkFilterPredicate('_1 < "2" || '_1 > "3", classOf[Operators.Or], 
Seq(Row("1"), Row("4")))
+    test(s"$prefix: filter pushdown - string") {
+      withParquetRDD((1 to 4).map(i => Tuple1(i.toString))) { implicit rdd =>
+        checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
+        checkFilterPredicate(
+          '_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(i => 
Row.apply(i.toString)))
+
+        checkFilterPredicate('_1 === "1", classOf[Eq[_]], "1")
+        checkFilterPredicate(
+          '_1 !== "1", classOf[NotEq[_]], (2 to 4).map(i => 
Row.apply(i.toString)))
+
+        checkFilterPredicate('_1 < "2", classOf[Lt[_]], "1")
+        checkFilterPredicate('_1 > "3", classOf[Gt[_]], "4")
+        checkFilterPredicate('_1 <= "1", classOf[LtEq[_]], "1")
+        checkFilterPredicate('_1 >= "4", classOf[GtEq[_]], "4")
+
+        checkFilterPredicate(Literal("1") === '_1, classOf[Eq[_]], "1")
+        checkFilterPredicate(Literal("2") > '_1, classOf[Lt[_]], "1")
+        checkFilterPredicate(Literal("3") < '_1, classOf[Gt[_]], "4")
+        checkFilterPredicate(Literal("1") >= '_1, classOf[LtEq[_]], "1")
+        checkFilterPredicate(Literal("4") <= '_1, classOf[GtEq[_]], "4")
+
+        checkFilterPredicate(!('_1 < "4"), classOf[GtEq[_]], "4")
+        checkFilterPredicate('_1 > "2" && '_1 < "4", classOf[Operators.And], 
"3")
+        checkFilterPredicate('_1 < "2" || '_1 > "3", classOf[Operators.Or], 
Seq(Row("1"), Row("4")))
+      }
     }
-  }
 
-  def checkBinaryFilterPredicate
-      (predicate: Predicate, filterClass: Class[_ <: FilterPredicate], 
expected: Seq[Row])
-      (implicit rdd: DataFrame): Unit = {
-    def checkBinaryAnswer(rdd: DataFrame, expected: Seq[Row]) = {
-      
assertResult(expected.map(_.getAs[Array[Byte]](0).mkString(",")).toSeq.sorted) {
-        rdd.map(_.getAs[Array[Byte]](0).mkString(",")).collect().toSeq.sorted
+    test(s"$prefix: filter pushdown - binary") {
+      implicit class IntToBinary(int: Int) {
+        def b: Array[Byte] = int.toString.getBytes("UTF-8")
       }
-    }
 
-    checkFilterPredicate(rdd, predicate, filterClass, checkBinaryAnswer _, 
expected)
-  }
+      withParquetRDD((1 to 4).map(i => Tuple1(i.b))) { implicit rdd =>
+        checkBinaryFilterPredicate('_1 === 1.b, classOf[Eq[_]], 1.b)
 
-  def checkBinaryFilterPredicate
-      (predicate: Predicate, filterClass: Class[_ <: FilterPredicate], 
expected: Array[Byte])
-      (implicit rdd: DataFrame): Unit = {
-    checkBinaryFilterPredicate(predicate, filterClass, Seq(Row(expected)))(rdd)
-  }
+        checkBinaryFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
+        checkBinaryFilterPredicate(
+          '_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(i => 
Row.apply(i.b)).toSeq)
 
-  test("filter pushdown - binary") {
-    implicit class IntToBinary(int: Int) {
-      def b: Array[Byte] = int.toString.getBytes("UTF-8")
-    }
+        checkBinaryFilterPredicate(
+          '_1 !== 1.b, classOf[NotEq[_]], (2 to 4).map(i => 
Row.apply(i.b)).toSeq)
+
+        checkBinaryFilterPredicate('_1 < 2.b, classOf[Lt[_]], 1.b)
+        checkBinaryFilterPredicate('_1 > 3.b, classOf[Gt[_]], 4.b)
+        checkBinaryFilterPredicate('_1 <= 1.b, classOf[LtEq[_]], 1.b)
+        checkBinaryFilterPredicate('_1 >= 4.b, classOf[GtEq[_]], 4.b)
 
-    withParquetRDD((1 to 4).map(i => Tuple1(i.b))) { implicit rdd =>
-      checkBinaryFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
-      checkBinaryFilterPredicate(
-        '_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(i => 
Row.apply(i.b)).toSeq)
-
-      checkBinaryFilterPredicate('_1 === 1.b, classOf[Eq   [_]], 1.b)
-      checkBinaryFilterPredicate(
-        '_1 !== 1.b, classOf[NotEq[_]], (2 to 4).map(i => 
Row.apply(i.b)).toSeq)
-
-      checkBinaryFilterPredicate('_1 < 2.b, classOf[Lt[_]], 1.b)
-      checkBinaryFilterPredicate('_1 > 3.b, classOf[Gt[_]], 4.b)
-      checkBinaryFilterPredicate('_1 <= 1.b, classOf[LtEq[_]], 1.b)
-      checkBinaryFilterPredicate('_1 >= 4.b, classOf[GtEq[_]], 4.b)
-
-      checkBinaryFilterPredicate(Literal(1.b) === '_1, classOf[Eq[_]], 1.b)
-      checkBinaryFilterPredicate(Literal(2.b) > '_1, classOf[Lt[_]], 1.b)
-      checkBinaryFilterPredicate(Literal(3.b) < '_1, classOf[Gt[_]], 4.b)
-      checkBinaryFilterPredicate(Literal(1.b) >= '_1, classOf[LtEq[_]], 1.b)
-      checkBinaryFilterPredicate(Literal(4.b) <= '_1, classOf[GtEq[_]], 4.b)
-
-      checkBinaryFilterPredicate(!('_1 < 4.b), classOf[GtEq[_]], 4.b)
-      checkBinaryFilterPredicate('_1 > 2.b && '_1 < 4.b, 
classOf[Operators.And], 3.b)
-      checkBinaryFilterPredicate(
-        '_1 < 2.b || '_1 > 3.b, classOf[Operators.Or], Seq(Row(1.b), Row(4.b)))
+        checkBinaryFilterPredicate(Literal(1.b) === '_1, classOf[Eq[_]], 1.b)
+        checkBinaryFilterPredicate(Literal(2.b) > '_1, classOf[Lt[_]], 1.b)
+        checkBinaryFilterPredicate(Literal(3.b) < '_1, classOf[Gt[_]], 4.b)
+        checkBinaryFilterPredicate(Literal(1.b) >= '_1, classOf[LtEq[_]], 1.b)
+        checkBinaryFilterPredicate(Literal(4.b) <= '_1, classOf[GtEq[_]], 4.b)
+
+        checkBinaryFilterPredicate(!('_1 < 4.b), classOf[GtEq[_]], 4.b)
+        checkBinaryFilterPredicate('_1 > 2.b && '_1 < 4.b, 
classOf[Operators.And], 3.b)
+        checkBinaryFilterPredicate(
+          '_1 < 2.b || '_1 > 3.b, classOf[Operators.Or], Seq(Row(1.b), 
Row(4.b)))
+      }
     }
   }
+
+  withSQLConf(SQLConf.PARQUET_USE_DATA_SOURCE_API -> "true") {
+    run("Parquet data source enabled")
+  }
+
+  withSQLConf(SQLConf.PARQUET_USE_DATA_SOURCE_API -> "false") {
+    run("Parquet data source disabled")
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a9ed5117/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
index 0bc246c..c8ebbbc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
@@ -73,218 +73,229 @@ class ParquetIOSuite extends QueryTest with ParquetTest {
     withParquetRDD(data)(r => checkAnswer(r, data.map(Row.fromTuple)))
   }
 
-  test("basic data types (without binary)") {
-    val data = (1 to 4).map { i =>
-      (i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble)
+  def run(prefix: String): Unit = {
+    test(s"$prefix: basic data types (without binary)") {
+      val data = (1 to 4).map { i =>
+        (i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble)
+      }
+      checkParquetFile(data)
     }
-    checkParquetFile(data)
-  }
 
-  test("raw binary") {
-    val data = (1 to 4).map(i => Tuple1(Array.fill(3)(i.toByte)))
-    withParquetRDD(data) { rdd =>
-      assertResult(data.map(_._1.mkString(",")).sorted) {
-        rdd.collect().map(_.getAs[Array[Byte]](0).mkString(",")).sorted
+    test(s"$prefix: raw binary") {
+      val data = (1 to 4).map(i => Tuple1(Array.fill(3)(i.toByte)))
+      withParquetRDD(data) { rdd =>
+        assertResult(data.map(_._1.mkString(",")).sorted) {
+          rdd.collect().map(_.getAs[Array[Byte]](0).mkString(",")).sorted
+        }
       }
     }
-  }
-
-  test("string") {
-    val data = (1 to 4).map(i => Tuple1(i.toString))
-    // Property spark.sql.parquet.binaryAsString shouldn't affect Parquet 
files written by Spark SQL
-    // as we store Spark SQL schema in the extra metadata.
-    withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING -> 
"false")(checkParquetFile(data))
-    withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING -> 
"true")(checkParquetFile(data))
-  }
 
-  test("fixed-length decimals") {
-    import org.apache.spark.sql.test.TestSQLContext.implicits._
-
-    def makeDecimalRDD(decimal: DecimalType): DataFrame =
-      sparkContext
-        .parallelize(0 to 1000)
-        .map(i => Tuple1(i / 100.0))
-        .select($"_1" cast decimal as "abcd")
+    test(s"$prefix: string") {
+      val data = (1 to 4).map(i => Tuple1(i.toString))
+      // Property spark.sql.parquet.binaryAsString shouldn't affect Parquet 
files written by Spark SQL
+      // as we store Spark SQL schema in the extra metadata.
+      withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING -> 
"false")(checkParquetFile(data))
+      withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING -> 
"true")(checkParquetFile(data))
+    }
 
-    for ((precision, scale) <- Seq((5, 2), (1, 0), (1, 1), (18, 10), (18, 
17))) {
-      withTempPath { dir =>
-        val data = makeDecimalRDD(DecimalType(precision, scale))
-        data.saveAsParquetFile(dir.getCanonicalPath)
-        checkAnswer(parquetFile(dir.getCanonicalPath), data.collect().toSeq)
+    test(s"$prefix: fixed-length decimals") {
+      import org.apache.spark.sql.test.TestSQLContext.implicits._
+
+      def makeDecimalRDD(decimal: DecimalType): DataFrame =
+        sparkContext
+          .parallelize(0 to 1000)
+          .map(i => Tuple1(i / 100.0))
+          // Parquet doesn't allow column names with spaces, have to add an 
alias here
+          .select($"_1" cast decimal as "dec")
+
+      for ((precision, scale) <- Seq((5, 2), (1, 0), (1, 1), (18, 10), (18, 
17))) {
+        withTempPath { dir =>
+          val data = makeDecimalRDD(DecimalType(precision, scale))
+          data.saveAsParquetFile(dir.getCanonicalPath)
+          checkAnswer(parquetFile(dir.getCanonicalPath), data.collect().toSeq)
+        }
       }
-    }
 
-    // Decimals with precision above 18 are not yet supported
-    intercept[RuntimeException] {
-      withTempPath { dir =>
-        makeDecimalRDD(DecimalType(19, 
10)).saveAsParquetFile(dir.getCanonicalPath)
-        parquetFile(dir.getCanonicalPath).collect()
+      // Decimals with precision above 18 are not yet supported
+      intercept[RuntimeException] {
+        withTempPath { dir =>
+          makeDecimalRDD(DecimalType(19, 
10)).saveAsParquetFile(dir.getCanonicalPath)
+          parquetFile(dir.getCanonicalPath).collect()
+        }
       }
-    }
 
-    // Unlimited-length decimals are not yet supported
-    intercept[RuntimeException] {
-      withTempPath { dir =>
-        
makeDecimalRDD(DecimalType.Unlimited).saveAsParquetFile(dir.getCanonicalPath)
-        parquetFile(dir.getCanonicalPath).collect()
+      // Unlimited-length decimals are not yet supported
+      intercept[RuntimeException] {
+        withTempPath { dir =>
+          
makeDecimalRDD(DecimalType.Unlimited).saveAsParquetFile(dir.getCanonicalPath)
+          parquetFile(dir.getCanonicalPath).collect()
+        }
       }
     }
-  }
 
-  test("map") {
-    val data = (1 to 4).map(i => Tuple1(Map(i -> s"val_$i")))
-    checkParquetFile(data)
-  }
+    test(s"$prefix: map") {
+      val data = (1 to 4).map(i => Tuple1(Map(i -> s"val_$i")))
+      checkParquetFile(data)
+    }
 
-  test("array") {
-    val data = (1 to 4).map(i => Tuple1(Seq(i, i + 1)))
-    checkParquetFile(data)
-  }
+    test(s"$prefix: array") {
+      val data = (1 to 4).map(i => Tuple1(Seq(i, i + 1)))
+      checkParquetFile(data)
+    }
 
-  test("struct") {
-    val data = (1 to 4).map(i => Tuple1((i, s"val_$i")))
-    withParquetRDD(data) { rdd =>
-      // Structs are converted to `Row`s
-      checkAnswer(rdd, data.map { case Tuple1(struct) =>
-        Row(Row(struct.productIterator.toSeq: _*))
-      })
+    test(s"$prefix: struct") {
+      val data = (1 to 4).map(i => Tuple1((i, s"val_$i")))
+      withParquetRDD(data) { rdd =>
+        // Structs are converted to `Row`s
+        checkAnswer(rdd, data.map { case Tuple1(struct) =>
+          Row(Row(struct.productIterator.toSeq: _*))
+        })
+      }
     }
-  }
 
-  test("nested struct with array of array as field") {
-    val data = (1 to 4).map(i => Tuple1((i, Seq(Seq(s"val_$i")))))
-    withParquetRDD(data) { rdd =>
-      // Structs are converted to `Row`s
-      checkAnswer(rdd, data.map { case Tuple1(struct) =>
-        Row(Row(struct.productIterator.toSeq: _*))
-      })
+    test(s"$prefix: nested struct with array of array as field") {
+      val data = (1 to 4).map(i => Tuple1((i, Seq(Seq(s"val_$i")))))
+      withParquetRDD(data) { rdd =>
+        // Structs are converted to `Row`s
+        checkAnswer(rdd, data.map { case Tuple1(struct) =>
+          Row(Row(struct.productIterator.toSeq: _*))
+        })
+      }
     }
-  }
 
-  test("nested map with struct as value type") {
-    val data = (1 to 4).map(i => Tuple1(Map(i -> (i, s"val_$i"))))
-    withParquetRDD(data) { rdd =>
-      checkAnswer(rdd, data.map { case Tuple1(m) =>
-        Row(m.mapValues(struct => Row(struct.productIterator.toSeq: _*)))
-      })
+    test(s"$prefix: nested map with struct as value type") {
+      val data = (1 to 4).map(i => Tuple1(Map(i -> (i, s"val_$i"))))
+      withParquetRDD(data) { rdd =>
+        checkAnswer(rdd, data.map { case Tuple1(m) =>
+          Row(m.mapValues(struct => Row(struct.productIterator.toSeq: _*)))
+        })
+      }
     }
-  }
 
-  test("nulls") {
-    val allNulls = (
-      null.asInstanceOf[java.lang.Boolean],
-      null.asInstanceOf[Integer],
-      null.asInstanceOf[java.lang.Long],
-      null.asInstanceOf[java.lang.Float],
-      null.asInstanceOf[java.lang.Double])
-
-    withParquetRDD(allNulls :: Nil) { rdd =>
-      val rows = rdd.collect()
-      assert(rows.size === 1)
-      assert(rows.head === Row(Seq.fill(5)(null): _*))
+    test(s"$prefix: nulls") {
+      val allNulls = (
+        null.asInstanceOf[java.lang.Boolean],
+        null.asInstanceOf[Integer],
+        null.asInstanceOf[java.lang.Long],
+        null.asInstanceOf[java.lang.Float],
+        null.asInstanceOf[java.lang.Double])
+
+      withParquetRDD(allNulls :: Nil) { rdd =>
+        val rows = rdd.collect()
+        assert(rows.size === 1)
+        assert(rows.head === Row(Seq.fill(5)(null): _*))
+      }
     }
-  }
 
-  test("nones") {
-    val allNones = (
-      None.asInstanceOf[Option[Int]],
-      None.asInstanceOf[Option[Long]],
-      None.asInstanceOf[Option[String]])
+    test(s"$prefix: nones") {
+      val allNones = (
+        None.asInstanceOf[Option[Int]],
+        None.asInstanceOf[Option[Long]],
+        None.asInstanceOf[Option[String]])
 
-    withParquetRDD(allNones :: Nil) { rdd =>
-      val rows = rdd.collect()
-      assert(rows.size === 1)
-      assert(rows.head === Row(Seq.fill(3)(null): _*))
+      withParquetRDD(allNones :: Nil) { rdd =>
+        val rows = rdd.collect()
+        assert(rows.size === 1)
+        assert(rows.head === Row(Seq.fill(3)(null): _*))
+      }
     }
-  }
 
-  test("compression codec") {
-    def compressionCodecFor(path: String) = {
-      val codecs = ParquetTypesConverter
-        .readMetaData(new Path(path), Some(configuration))
-        .getBlocks
-        .flatMap(_.getColumns)
-        .map(_.getCodec.name())
-        .distinct
-
-      assert(codecs.size === 1)
-      codecs.head
-    }
+    test(s"$prefix: compression codec") {
+      def compressionCodecFor(path: String) = {
+        val codecs = ParquetTypesConverter
+          .readMetaData(new Path(path), Some(configuration))
+          .getBlocks
+          .flatMap(_.getColumns)
+          .map(_.getCodec.name())
+          .distinct
+
+        assert(codecs.size === 1)
+        codecs.head
+      }
 
-    val data = (0 until 10).map(i => (i, i.toString))
+      val data = (0 until 10).map(i => (i, i.toString))
 
-    def checkCompressionCodec(codec: CompressionCodecName): Unit = {
-      withSQLConf(SQLConf.PARQUET_COMPRESSION -> codec.name()) {
-        withParquetFile(data) { path =>
-          assertResult(conf.parquetCompressionCodec.toUpperCase) {
-            compressionCodecFor(path)
+      def checkCompressionCodec(codec: CompressionCodecName): Unit = {
+        withSQLConf(SQLConf.PARQUET_COMPRESSION -> codec.name()) {
+          withParquetFile(data) { path =>
+            assertResult(conf.parquetCompressionCodec.toUpperCase) {
+              compressionCodecFor(path)
+            }
           }
         }
       }
-    }
 
-    // Checks default compression codec
-    
checkCompressionCodec(CompressionCodecName.fromConf(conf.parquetCompressionCodec))
+      // Checks default compression codec
+      
checkCompressionCodec(CompressionCodecName.fromConf(conf.parquetCompressionCodec))
 
-    checkCompressionCodec(CompressionCodecName.UNCOMPRESSED)
-    checkCompressionCodec(CompressionCodecName.GZIP)
-    checkCompressionCodec(CompressionCodecName.SNAPPY)
-  }
+      checkCompressionCodec(CompressionCodecName.UNCOMPRESSED)
+      checkCompressionCodec(CompressionCodecName.GZIP)
+      checkCompressionCodec(CompressionCodecName.SNAPPY)
+    }
 
-  test("read raw Parquet file") {
-    def makeRawParquetFile(path: Path): Unit = {
-      val schema = MessageTypeParser.parseMessageType(
-        """
-          |message root {
-          |  required boolean _1;
-          |  required int32   _2;
-          |  required int64   _3;
-          |  required float   _4;
-          |  required double  _5;
-          |}
-        """.stripMargin)
-
-      val writeSupport = new TestGroupWriteSupport(schema)
-      val writer = new ParquetWriter[Group](path, writeSupport)
-
-      (0 until 10).foreach { i =>
-        val record = new SimpleGroup(schema)
-        record.add(0, i % 2 == 0)
-        record.add(1, i)
-        record.add(2, i.toLong)
-        record.add(3, i.toFloat)
-        record.add(4, i.toDouble)
-        writer.write(record)
-      }
+    test(s"$prefix: read raw Parquet file") {
+      def makeRawParquetFile(path: Path): Unit = {
+        val schema = MessageTypeParser.parseMessageType(
+          """
+            |message root {
+            |  required boolean _1;
+            |  required int32   _2;
+            |  required int64   _3;
+            |  required float   _4;
+            |  required double  _5;
+            |}
+          """.stripMargin)
+
+        val writeSupport = new TestGroupWriteSupport(schema)
+        val writer = new ParquetWriter[Group](path, writeSupport)
+
+        (0 until 10).foreach { i =>
+          val record = new SimpleGroup(schema)
+          record.add(0, i % 2 == 0)
+          record.add(1, i)
+          record.add(2, i.toLong)
+          record.add(3, i.toFloat)
+          record.add(4, i.toDouble)
+          writer.write(record)
+        }
 
-      writer.close()
-    }
+        writer.close()
+      }
 
-    withTempDir { dir =>
-      val path = new Path(dir.toURI.toString, "part-r-0.parquet")
-      makeRawParquetFile(path)
-      checkAnswer(parquetFile(path.toString), (0 until 10).map { i =>
-        Row(i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble)
-      })
+      withTempDir { dir =>
+        val path = new Path(dir.toURI.toString, "part-r-0.parquet")
+        makeRawParquetFile(path)
+        checkAnswer(parquetFile(path.toString), (0 until 10).map { i =>
+          Row(i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble)
+        })
+      }
     }
-  }
 
-  test("write metadata") {
-    withTempPath { file =>
-      val path = new Path(file.toURI.toString)
-      val fs = FileSystem.getLocal(configuration)
-      val attributes = ScalaReflection.attributesFor[(Int, String)]
-      ParquetTypesConverter.writeMetaData(attributes, path, configuration)
+    test(s"$prefix: write metadata") {
+      withTempPath { file =>
+        val path = new Path(file.toURI.toString)
+        val fs = FileSystem.getLocal(configuration)
+        val attributes = ScalaReflection.attributesFor[(Int, String)]
+        ParquetTypesConverter.writeMetaData(attributes, path, configuration)
 
-      assert(fs.exists(new Path(path, 
ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)))
-      assert(fs.exists(new Path(path, 
ParquetFileWriter.PARQUET_METADATA_FILE)))
+        assert(fs.exists(new Path(path, 
ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)))
+        assert(fs.exists(new Path(path, 
ParquetFileWriter.PARQUET_METADATA_FILE)))
 
-      val metaData = ParquetTypesConverter.readMetaData(path, 
Some(configuration))
-      val actualSchema = metaData.getFileMetaData.getSchema
-      val expectedSchema = 
ParquetTypesConverter.convertFromAttributes(attributes)
+        val metaData = ParquetTypesConverter.readMetaData(path, 
Some(configuration))
+        val actualSchema = metaData.getFileMetaData.getSchema
+        val expectedSchema = 
ParquetTypesConverter.convertFromAttributes(attributes)
 
-      actualSchema.checkContains(expectedSchema)
-      expectedSchema.checkContains(actualSchema)
+        actualSchema.checkContains(expectedSchema)
+        expectedSchema.checkContains(actualSchema)
+      }
     }
   }
+
+  withSQLConf(SQLConf.PARQUET_USE_DATA_SOURCE_API -> "true") {
+    run("Parquet data source enabled")
+  }
+
+  withSQLConf(SQLConf.PARQUET_USE_DATA_SOURCE_API -> "false") {
+    run("Parquet data source disabled")
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a9ed5117/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
new file mode 100644
index 0000000..ae606d1
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.parquet
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.fs.Path
+import org.scalatest.FunSuite
+
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.parquet.ParquetRelation2._
+import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.{Row, SQLContext}
+
+class ParquetPartitionDiscoverySuite extends FunSuite with ParquetTest {
+  override val sqlContext: SQLContext = TestSQLContext
+
+  val defaultPartitionName = "__NULL__"
+
+  test("column type inference") {
+    def check(raw: String, literal: Literal): Unit = {
+      assert(inferPartitionColumnValue(raw, defaultPartitionName) === literal)
+    }
+
+    check("10", Literal(10, IntegerType))
+    check("1000000000000000", Literal(1000000000000000L, LongType))
+    check("1.5", Literal(1.5, FloatType))
+    check("hello", Literal("hello", StringType))
+    check(defaultPartitionName, Literal(null, NullType))
+  }
+
+  test("parse partition") {
+    def check(path: String, expected: PartitionValues): Unit = {
+      assert(expected === parsePartition(new Path(path), defaultPartitionName))
+    }
+
+    def checkThrows[T <: Throwable: Manifest](path: String, expected: String): 
Unit = {
+      val message = intercept[T] {
+        parsePartition(new Path(path), defaultPartitionName)
+      }.getMessage
+
+      assert(message.contains(expected))
+    }
+
+    check(
+      "file:///",
+      PartitionValues(
+        ArrayBuffer.empty[String],
+        ArrayBuffer.empty[Literal]))
+
+    check(
+      "file://path/a=10",
+      PartitionValues(
+        ArrayBuffer("a"),
+        ArrayBuffer(Literal(10, IntegerType))))
+
+    check(
+      "file://path/a=10/b=hello/c=1.5",
+      PartitionValues(
+        ArrayBuffer("a", "b", "c"),
+        ArrayBuffer(
+          Literal(10, IntegerType),
+          Literal("hello", StringType),
+          Literal(1.5, FloatType))))
+
+    check(
+      "file://path/a=10/b_hello/c=1.5",
+      PartitionValues(
+        ArrayBuffer("c"),
+        ArrayBuffer(Literal(1.5, FloatType))))
+
+    checkThrows[AssertionError]("file://path/=10", "Empty partition column 
name")
+    checkThrows[AssertionError]("file://path/a=", "Empty partition column 
value")
+  }
+
+  test("parse partitions") {
+    def check(paths: Seq[String], spec: PartitionSpec): Unit = {
+      assert(parsePartitions(paths.map(new Path(_)), defaultPartitionName) === 
spec)
+    }
+
+    check(Seq(
+      "hdfs://host:9000/path/a=10/b=hello"),
+      PartitionSpec(
+        StructType(Seq(
+          StructField("a", IntegerType),
+          StructField("b", StringType))),
+        Seq(Partition(Row(10, "hello"), 
"hdfs://host:9000/path/a=10/b=hello"))))
+
+    check(Seq(
+      "hdfs://host:9000/path/a=10/b=20",
+      "hdfs://host:9000/path/a=10.5/b=hello"),
+      PartitionSpec(
+        StructType(Seq(
+          StructField("a", FloatType),
+          StructField("b", StringType))),
+        Seq(
+          Partition(Row(10, "20"), "hdfs://host:9000/path/a=10/b=20"),
+          Partition(Row(10.5, "hello"), 
"hdfs://host:9000/path/a=10.5/b=hello"))))
+
+    check(Seq(
+      s"hdfs://host:9000/path/a=10/b=$defaultPartitionName",
+      s"hdfs://host:9000/path/a=10.5/b=$defaultPartitionName"),
+      PartitionSpec(
+        StructType(Seq(
+          StructField("a", FloatType),
+          StructField("b", StringType))),
+        Seq(
+          Partition(Row(10, null), 
s"hdfs://host:9000/path/a=10/b=$defaultPartitionName"),
+          Partition(Row(10.5, null), 
s"hdfs://host:9000/path/a=10.5/b=$defaultPartitionName"))))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a9ed5117/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index 5ec7a15..48c7598 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -17,10 +17,10 @@
 
 package org.apache.spark.sql.parquet
 
-import org.apache.spark.sql.QueryTest
 import org.apache.spark.sql.catalyst.expressions.Row
 import org.apache.spark.sql.test.TestSQLContext
 import org.apache.spark.sql.test.TestSQLContext._
+import org.apache.spark.sql.{QueryTest, SQLConf}
 
 /**
  * A test suite that tests various Parquet queries.
@@ -28,82 +28,93 @@ import org.apache.spark.sql.test.TestSQLContext._
 class ParquetQuerySuite extends QueryTest with ParquetTest {
   val sqlContext = TestSQLContext
 
-  test("simple projection") {
-    withParquetTable((0 until 10).map(i => (i, i.toString)), "t") {
-      checkAnswer(sql("SELECT _1 FROM t"), (0 until 10).map(Row.apply(_)))
+  def run(prefix: String): Unit = {
+    test(s"$prefix: simple projection") {
+      withParquetTable((0 until 10).map(i => (i, i.toString)), "t") {
+        checkAnswer(sql("SELECT _1 FROM t"), (0 until 10).map(Row.apply(_)))
+      }
     }
-  }
 
-  test("appending") {
-    val data = (0 until 10).map(i => (i, i.toString))
-    withParquetTable(data, "t") {
-      sql("INSERT INTO TABLE t SELECT * FROM t")
-      checkAnswer(table("t"), (data ++ data).map(Row.fromTuple))
+    // TODO Re-enable this after data source insertion API is merged
+    test(s"$prefix: appending") {
+      val data = (0 until 10).map(i => (i, i.toString))
+      withParquetTable(data, "t") {
+        sql("INSERT INTO TABLE t SELECT * FROM t")
+        checkAnswer(table("t"), (data ++ data).map(Row.fromTuple))
+      }
     }
-  }
 
-  // This test case will trigger the NPE mentioned in
-  // https://issues.apache.org/jira/browse/PARQUET-151.
-  ignore("overwriting") {
-    val data = (0 until 10).map(i => (i, i.toString))
-    withParquetTable(data, "t") {
-      sql("INSERT OVERWRITE TABLE t SELECT * FROM t")
-      checkAnswer(table("t"), data.map(Row.fromTuple))
+    // This test case will trigger the NPE mentioned in
+    // https://issues.apache.org/jira/browse/PARQUET-151.
+    ignore(s"$prefix: overwriting") {
+      val data = (0 until 10).map(i => (i, i.toString))
+      withParquetTable(data, "t") {
+        sql("INSERT OVERWRITE TABLE t SELECT * FROM t")
+        checkAnswer(table("t"), data.map(Row.fromTuple))
+      }
     }
-  }
 
-  test("self-join") {
-    // 4 rows, cells of column 1 of row 2 and row 4 are null
-    val data = (1 to 4).map { i =>
-      val maybeInt = if (i % 2 == 0) None else Some(i)
-      (maybeInt, i.toString)
-    }
+    test(s"$prefix: self-join") {
+      // 4 rows, cells of column 1 of row 2 and row 4 are null
+      val data = (1 to 4).map { i =>
+        val maybeInt = if (i % 2 == 0) None else Some(i)
+        (maybeInt, i.toString)
+      }
 
-    withParquetTable(data, "t") {
-      val selfJoin = sql("SELECT * FROM t x JOIN t y WHERE x._1 = y._1")
-      val queryOutput = selfJoin.queryExecution.analyzed.output
+      withParquetTable(data, "t") {
+        val selfJoin = sql("SELECT * FROM t x JOIN t y WHERE x._1 = y._1")
+        val queryOutput = selfJoin.queryExecution.analyzed.output
 
-      assertResult(4, s"Field count mismatches")(queryOutput.size)
-      assertResult(2, s"Duplicated expression ID in query plan:\n $selfJoin") {
-        queryOutput.filter(_.name == "_1").map(_.exprId).size
-      }
+        assertResult(4, s"Field count mismatches")(queryOutput.size)
+        assertResult(2, s"Duplicated expression ID in query plan:\n 
$selfJoin") {
+          queryOutput.filter(_.name == "_1").map(_.exprId).size
+        }
 
-      checkAnswer(selfJoin, List(Row(1, "1", 1, "1"), Row(3, "3", 3, "3")))
+        checkAnswer(selfJoin, List(Row(1, "1", 1, "1"), Row(3, "3", 3, "3")))
+      }
     }
-  }
 
-  test("nested data - struct with array field") {
-    val data = (1 to 10).map(i => Tuple1((i, Seq(s"val_$i"))))
-    withParquetTable(data, "t") {
-      checkAnswer(sql("SELECT _1._2[0] FROM t"), data.map {
-        case Tuple1((_, Seq(string))) => Row(string)
-      })
+    test(s"$prefix: nested data - struct with array field") {
+      val data = (1 to 10).map(i => Tuple1((i, Seq(s"val_$i"))))
+      withParquetTable(data, "t") {
+        checkAnswer(sql("SELECT _1._2[0] FROM t"), data.map {
+          case Tuple1((_, Seq(string))) => Row(string)
+        })
+      }
     }
-  }
 
-  test("nested data - array of struct") {
-    val data = (1 to 10).map(i => Tuple1(Seq(i -> s"val_$i")))
-    withParquetTable(data, "t") {
-      checkAnswer(sql("SELECT _1[0]._2 FROM t"), data.map {
-        case Tuple1(Seq((_, string))) => Row(string)
-      })
+    test(s"$prefix: nested data - array of struct") {
+      val data = (1 to 10).map(i => Tuple1(Seq(i -> s"val_$i")))
+      withParquetTable(data, "t") {
+        checkAnswer(sql("SELECT _1[0]._2 FROM t"), data.map {
+          case Tuple1(Seq((_, string))) => Row(string)
+        })
+      }
     }
-  }
 
-  test("SPARK-1913 regression: columns only referenced by pushed down filters 
should remain") {
-    withParquetTable((1 to 10).map(Tuple1.apply), "t") {
-      checkAnswer(sql(s"SELECT _1 FROM t WHERE _1 < 10"), (1 to 
9).map(Row.apply(_)))
+    test(s"$prefix: SPARK-1913 regression: columns only referenced by pushed 
down filters should remain") {
+      withParquetTable((1 to 10).map(Tuple1.apply), "t") {
+        checkAnswer(sql(s"SELECT _1 FROM t WHERE _1 < 10"), (1 to 
9).map(Row.apply(_)))
+      }
     }
-  }
 
-  test("SPARK-5309 strings stored using dictionary compression in parquet") {
-    withParquetTable((0 until 1000).map(i => ("same", "run_" + i /100, 1)), 
"t") {
+    test(s"$prefix: SPARK-5309 strings stored using dictionary compression in 
parquet") {
+      withParquetTable((0 until 1000).map(i => ("same", "run_" + i /100, 1)), 
"t") {
 
-      checkAnswer(sql(s"SELECT _1, _2, SUM(_3) FROM t GROUP BY _1, _2"),
-        (0 until 10).map(i => Row("same", "run_" + i, 100)))
+        checkAnswer(sql(s"SELECT _1, _2, SUM(_3) FROM t GROUP BY _1, _2"),
+          (0 until 10).map(i => Row("same", "run_" + i, 100)))
 
-      checkAnswer(sql(s"SELECT _1, _2, SUM(_3) FROM t WHERE _2 = 'run_5' GROUP 
BY _1, _2"),
-        List(Row("same", "run_5", 100)))
+        checkAnswer(sql(s"SELECT _1, _2, SUM(_3) FROM t WHERE _2 = 'run_5' 
GROUP BY _1, _2"),
+          List(Row("same", "run_5", 100)))
+      }
     }
   }
+
+  withSQLConf(SQLConf.PARQUET_USE_DATA_SOURCE_API -> "true") {
+    run("Parquet data source enabled")
+  }
+
+  withSQLConf(SQLConf.PARQUET_USE_DATA_SOURCE_API -> "false") {
+    run("Parquet data source disabled")
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a9ed5117/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
index 5f7f31d..2e6c2d5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
@@ -25,6 +25,7 @@ import parquet.schema.MessageTypeParser
 
 import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.sql.types._
 
 class ParquetSchemaSuite extends FunSuite with ParquetTest {
   val sqlContext = TestSQLContext
@@ -192,4 +193,40 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest 
{
       assert(a.nullable === b.nullable)
     }
   }
+
+  test("merge with metastore schema") {
+    // Field type conflict resolution
+    assertResult(
+      StructType(Seq(
+        StructField("lowerCase", StringType),
+        StructField("UPPERCase", DoubleType, nullable = false)))) {
+
+      ParquetRelation2.mergeMetastoreParquetSchema(
+        StructType(Seq(
+          StructField("lowercase", StringType),
+          StructField("uppercase", DoubleType, nullable = false))),
+
+        StructType(Seq(
+          StructField("lowerCase", BinaryType),
+          StructField("UPPERCase", IntegerType, nullable = true))))
+    }
+
+    // Conflicting field count
+    assert(intercept[Throwable] {
+      ParquetRelation2.mergeMetastoreParquetSchema(
+        StructType(Seq(
+          StructField("uppercase", DoubleType, nullable = false))),
+
+        StructType(Seq(
+          StructField("lowerCase", BinaryType),
+          StructField("UPPERCase", IntegerType, nullable = true))))
+    }.getMessage.contains("detected conflicting schemas"))
+
+    // Conflicting field names
+    intercept[Throwable] {
+      ParquetRelation2.mergeMetastoreParquetSchema(
+        StructType(Seq(StructField("lower", StringType))),
+        StructType(Seq(StructField("lowerCase", BinaryType))))
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a9ed5117/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 2433106..c78369d 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -38,6 +38,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
+import org.apache.spark.sql.parquet.ParquetRelation2
 import org.apache.spark.sql.sources.{DDLParser, LogicalRelation, 
ResolvedDataSource}
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
@@ -175,10 +176,25 @@ private[hive] class HiveMetastoreCatalog(hive: 
HiveContext) extends Catalog with
           Nil
         }
 
-      // Since HiveQL is case insensitive for table names we make them all 
lowercase.
-      MetastoreRelation(
+      val relation = MetastoreRelation(
         databaseName, tblName, alias)(
           table.getTTable, partitions.map(part => part.getTPartition))(hive)
+
+      if (hive.convertMetastoreParquet &&
+          hive.conf.parquetUseDataSourceApi &&
+          
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet")) {
+        val metastoreSchema = StructType.fromAttributes(relation.output)
+        val paths = if (relation.hiveQlTable.isPartitioned) {
+          relation.hiveQlPartitions.map(p => p.getLocation)
+        } else {
+          Seq(relation.hiveQlTable.getDataLocation.toString)
+        }
+
+        LogicalRelation(ParquetRelation2(
+          paths, Map(ParquetRelation2.METASTORE_SCHEMA -> 
metastoreSchema.json))(hive))
+      } else {
+        relation
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a9ed5117/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 7857a02..95abc36 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -87,7 +87,8 @@ private[hive] trait HiveStrategies {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case PhysicalOperation(projectList, predicates, relation: 
MetastoreRelation)
           if relation.tableDesc.getSerdeClassName.contains("Parquet") &&
-             hiveContext.convertMetastoreParquet =>
+             hiveContext.convertMetastoreParquet &&
+             !hiveContext.conf.parquetUseDataSourceApi =>
 
         // Filter out all predicates that only deal with partition keys
         val partitionsKeys = AttributeSet(relation.partitionKeys)
@@ -136,8 +137,10 @@ private[hive] trait HiveStrategies {
               pruningCondition(inputData)
             }
 
+            val partitionLocations = partitions.map(_.getLocation)
+
             hiveContext
-              .parquetFile(partitions.map(_.getLocation).mkString(","))
+              .parquetFile(partitionLocations.head, partitionLocations.tail: 
_*)
               .addPartitioningAttributes(relation.partitionKeys)
               .lowerCase
               .where(unresolvedOtherPredicates)

http://git-wip-us.apache.org/repos/asf/spark/blob/a9ed5117/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
index 581f666..eae69af 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
@@ -28,53 +28,55 @@ class HiveParquetSuite extends QueryTest with ParquetTest {
 
   import sqlContext._
 
-  test("Case insensitive attribute names") {
-    withParquetTable((1 to 4).map(i => Cases(i.toString, i.toString)), 
"cases") {
-      val expected = (1 to 4).map(i => Row(i.toString))
-      checkAnswer(sql("SELECT upper FROM cases"), expected)
-      checkAnswer(sql("SELECT LOWER FROM cases"), expected)
+  def run(prefix: String): Unit = {
+    test(s"$prefix: Case insensitive attribute names") {
+      withParquetTable((1 to 4).map(i => Cases(i.toString, i.toString)), 
"cases") {
+        val expected = (1 to 4).map(i => Row(i.toString))
+        checkAnswer(sql("SELECT upper FROM cases"), expected)
+        checkAnswer(sql("SELECT LOWER FROM cases"), expected)
+      }
     }
-  }
 
-  test("SELECT on Parquet table") {
-    val data = (1 to 4).map(i => (i, s"val_$i"))
-    withParquetTable(data, "t") {
-      checkAnswer(sql("SELECT * FROM t"), data.map(Row.fromTuple))
-    }
-  }
-
-  test("Simple column projection + filter on Parquet table") {
-    withParquetTable((1 to 4).map(i => (i % 2 == 0, i, s"val_$i")), "t") {
-      checkAnswer(
-        sql("SELECT `_1`, `_3` FROM t WHERE `_1` = true"),
-        Seq(Row(true, "val_2"), Row(true, "val_4")))
+    test(s"$prefix: SELECT on Parquet table") {
+      val data = (1 to 4).map(i => (i, s"val_$i"))
+      withParquetTable(data, "t") {
+        checkAnswer(sql("SELECT * FROM t"), data.map(Row.fromTuple))
+      }
     }
-  }
 
-  test("Converting Hive to Parquet Table via saveAsParquetFile") {
-    withTempPath { dir =>
-      sql("SELECT * FROM src").saveAsParquetFile(dir.getCanonicalPath)
-      parquetFile(dir.getCanonicalPath).registerTempTable("p")
-      withTempTable("p") {
+    test(s"$prefix: Simple column projection + filter on Parquet table") {
+      withParquetTable((1 to 4).map(i => (i % 2 == 0, i, s"val_$i")), "t") {
         checkAnswer(
-          sql("SELECT * FROM src ORDER BY key"),
-          sql("SELECT * from p ORDER BY key").collect().toSeq)
+          sql("SELECT `_1`, `_3` FROM t WHERE `_1` = true"),
+          Seq(Row(true, "val_2"), Row(true, "val_4")))
       }
     }
-  }
-
 
-  test("INSERT OVERWRITE TABLE Parquet table") {
-    withParquetTable((1 to 4).map(i => (i, s"val_$i")), "t") {
-      withTempPath { file =>
-        sql("SELECT * FROM t LIMIT 1").saveAsParquetFile(file.getCanonicalPath)
-        parquetFile(file.getCanonicalPath).registerTempTable("p")
+    test(s"$prefix: Converting Hive to Parquet Table via saveAsParquetFile") {
+      withTempPath { dir =>
+        sql("SELECT * FROM src").saveAsParquetFile(dir.getCanonicalPath)
+        parquetFile(dir.getCanonicalPath).registerTempTable("p")
         withTempTable("p") {
-          // let's do three overwrites for good measure
-          sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
-          sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
-          sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
-          checkAnswer(sql("SELECT * FROM p"), sql("SELECT * FROM 
t").collect().toSeq)
+          checkAnswer(
+            sql("SELECT * FROM src ORDER BY key"),
+            sql("SELECT * from p ORDER BY key").collect().toSeq)
+        }
+      }
+    }
+
+    // TODO Re-enable this after data source insertion API is merged
+    ignore(s"$prefix: INSERT OVERWRITE TABLE Parquet table") {
+      withParquetTable((1 to 10).map(i => (i, s"val_$i")), "t") {
+        withTempPath { file =>
+          sql("SELECT * FROM t LIMIT 
1").saveAsParquetFile(file.getCanonicalPath)
+          parquetFile(file.getCanonicalPath).registerTempTable("p")
+          withTempTable("p") {
+            // let's do three overwrites for good measure
+            sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
+            sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
+            sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
+            checkAnswer(sql("SELECT * FROM p"), sql("SELECT * FROM 
t").collect().toSeq)
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/a9ed5117/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
index 30441bb..a7479a5 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
@@ -23,7 +23,8 @@ import java.io.File
 import org.apache.spark.sql.catalyst.expressions.Row
 import org.scalatest.BeforeAndAfterAll
 
-import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.{SQLConf, QueryTest}
+import org.apache.spark.sql.execution.PhysicalRDD
 import org.apache.spark.sql.hive.execution.HiveTableScan
 import org.apache.spark.sql.hive.test.TestHive._
 
@@ -79,7 +80,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
        STORED AS
        INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
        OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
-      location '${new File(partitionedTableDir, "p=1").getCanonicalPath}'
+      location '${new File(normalTableDir, "normal").getCanonicalPath}'
     """)
 
     (1 to 10).foreach { p =>
@@ -97,7 +98,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
     setConf("spark.sql.hive.convertMetastoreParquet", "false")
   }
 
-  test("conversion is working") {
+  test(s"conversion is working") {
     assert(
       sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect {
         case _: HiveTableScan => true
@@ -105,6 +106,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest 
{
     assert(
       sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect {
         case _: ParquetTableScan => true
+        case _: PhysicalRDD => true
       }.nonEmpty)
   }
 }
@@ -147,6 +149,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest {
  */
 abstract class ParquetPartitioningTest extends QueryTest with 
BeforeAndAfterAll {
   var partitionedTableDir: File = null
+  var normalTableDir: File = null
   var partitionedTableDirWithKey: File = null
 
   import org.apache.spark.sql.hive.test.TestHive.implicits._
@@ -156,6 +159,10 @@ abstract class ParquetPartitioningTest extends QueryTest 
with BeforeAndAfterAll
     partitionedTableDir.delete()
     partitionedTableDir.mkdir()
 
+    normalTableDir = File.createTempFile("parquettests", "sparksql")
+    normalTableDir.delete()
+    normalTableDir.mkdir()
+
     (1 to 10).foreach { p =>
       val partDir = new File(partitionedTableDir, s"p=$p")
       sparkContext.makeRDD(1 to 10)
@@ -163,6 +170,11 @@ abstract class ParquetPartitioningTest extends QueryTest 
with BeforeAndAfterAll
         .saveAsParquetFile(partDir.getCanonicalPath)
     }
 
+    sparkContext
+      .makeRDD(1 to 10)
+      .map(i => ParquetData(i, s"part-1"))
+      .saveAsParquetFile(new File(normalTableDir, "normal").getCanonicalPath)
+
     partitionedTableDirWithKey = File.createTempFile("parquettests", 
"sparksql")
     partitionedTableDirWithKey.delete()
     partitionedTableDirWithKey.mkdir()
@@ -175,99 +187,107 @@ abstract class ParquetPartitioningTest extends QueryTest 
with BeforeAndAfterAll
     }
   }
 
-  Seq("partitioned_parquet", "partitioned_parquet_with_key").foreach { table =>
-    test(s"ordering of the partitioning columns $table") {
-      checkAnswer(
-        sql(s"SELECT p, stringField FROM $table WHERE p = 1"),
-        Seq.fill(10)(Row(1, "part-1"))
-      )
-
-      checkAnswer(
-        sql(s"SELECT stringField, p FROM $table WHERE p = 1"),
-        Seq.fill(10)(Row("part-1", 1))
-      )
-    }
-
-    test(s"project the partitioning column $table") {
-      checkAnswer(
-        sql(s"SELECT p, count(*) FROM $table group by p"),
-        Row(1, 10) ::
-          Row(2, 10) ::
-          Row(3, 10) ::
-          Row(4, 10) ::
-          Row(5, 10) ::
-          Row(6, 10) ::
-          Row(7, 10) ::
-          Row(8, 10) ::
-          Row(9, 10) ::
-          Row(10, 10) :: Nil
-      )
-    }
-
-    test(s"project partitioning and non-partitioning columns $table") {
-      checkAnswer(
-        sql(s"SELECT stringField, p, count(intField) FROM $table GROUP BY p, 
stringField"),
-        Row("part-1", 1, 10) ::
-          Row("part-2", 2, 10) ::
-          Row("part-3", 3, 10) ::
-          Row("part-4", 4, 10) ::
-          Row("part-5", 5, 10) ::
-          Row("part-6", 6, 10) ::
-          Row("part-7", 7, 10) ::
-          Row("part-8", 8, 10) ::
-          Row("part-9", 9, 10) ::
-          Row("part-10", 10, 10) :: Nil
-      )
-    }
-
-    test(s"simple count $table") {
-      checkAnswer(
-        sql(s"SELECT COUNT(*) FROM $table"),
-        Row(100))
+  def run(prefix: String): Unit = {
+    Seq("partitioned_parquet", "partitioned_parquet_with_key").foreach { table 
=>
+      test(s"$prefix: ordering of the partitioning columns $table") {
+        checkAnswer(
+          sql(s"SELECT p, stringField FROM $table WHERE p = 1"),
+          Seq.fill(10)(Row(1, "part-1"))
+        )
+
+        checkAnswer(
+          sql(s"SELECT stringField, p FROM $table WHERE p = 1"),
+          Seq.fill(10)(Row("part-1", 1))
+        )
+      }
+
+      test(s"$prefix: project the partitioning column $table") {
+        checkAnswer(
+          sql(s"SELECT p, count(*) FROM $table group by p"),
+          Row(1, 10) ::
+            Row(2, 10) ::
+            Row(3, 10) ::
+            Row(4, 10) ::
+            Row(5, 10) ::
+            Row(6, 10) ::
+            Row(7, 10) ::
+            Row(8, 10) ::
+            Row(9, 10) ::
+            Row(10, 10) :: Nil
+        )
+      }
+
+      test(s"$prefix: project partitioning and non-partitioning columns 
$table") {
+        checkAnswer(
+          sql(s"SELECT stringField, p, count(intField) FROM $table GROUP BY p, 
stringField"),
+          Row("part-1", 1, 10) ::
+            Row("part-2", 2, 10) ::
+            Row("part-3", 3, 10) ::
+            Row("part-4", 4, 10) ::
+            Row("part-5", 5, 10) ::
+            Row("part-6", 6, 10) ::
+            Row("part-7", 7, 10) ::
+            Row("part-8", 8, 10) ::
+            Row("part-9", 9, 10) ::
+            Row("part-10", 10, 10) :: Nil
+        )
+      }
+
+      test(s"$prefix: simple count $table") {
+        checkAnswer(
+          sql(s"SELECT COUNT(*) FROM $table"),
+          Row(100))
+      }
+
+      test(s"$prefix: pruned count $table") {
+        checkAnswer(
+          sql(s"SELECT COUNT(*) FROM $table WHERE p = 1"),
+          Row(10))
+      }
+
+      test(s"$prefix: non-existent partition $table") {
+        checkAnswer(
+          sql(s"SELECT COUNT(*) FROM $table WHERE p = 1000"),
+          Row(0))
+      }
+
+      test(s"$prefix: multi-partition pruned count $table") {
+        checkAnswer(
+          sql(s"SELECT COUNT(*) FROM $table WHERE p IN (1,2,3)"),
+          Row(30))
+      }
+
+      test(s"$prefix: non-partition predicates $table") {
+        checkAnswer(
+          sql(s"SELECT COUNT(*) FROM $table WHERE intField IN (1,2,3)"),
+          Row(30))
+      }
+
+      test(s"$prefix: sum $table") {
+        checkAnswer(
+          sql(s"SELECT SUM(intField) FROM $table WHERE intField IN (1,2,3) AND 
p = 1"),
+          Row(1 + 2 + 3))
+      }
+
+      test(s"$prefix: hive udfs $table") {
+        checkAnswer(
+          sql(s"SELECT concat(stringField, stringField) FROM $table"),
+          sql(s"SELECT stringField FROM $table").map {
+            case Row(s: String) => Row(s + s)
+          }.collect().toSeq)
+      }
     }
 
-    test(s"pruned count $table") {
+    test(s"$prefix: $prefix: non-part select(*)") {
       checkAnswer(
-        sql(s"SELECT COUNT(*) FROM $table WHERE p = 1"),
+        sql("SELECT COUNT(*) FROM normal_parquet"),
         Row(10))
     }
-
-    test(s"non-existant partition $table") {
-      checkAnswer(
-        sql(s"SELECT COUNT(*) FROM $table WHERE p = 1000"),
-        Row(0))
-    }
-
-    test(s"multi-partition pruned count $table") {
-      checkAnswer(
-        sql(s"SELECT COUNT(*) FROM $table WHERE p IN (1,2,3)"),
-        Row(30))
-    }
-
-    test(s"non-partition predicates $table") {
-      checkAnswer(
-        sql(s"SELECT COUNT(*) FROM $table WHERE intField IN (1,2,3)"),
-        Row(30))
-    }
-
-    test(s"sum $table") {
-      checkAnswer(
-        sql(s"SELECT SUM(intField) FROM $table WHERE intField IN (1,2,3) AND p 
= 1"),
-        Row(1 + 2 + 3))
-    }
-
-    test(s"hive udfs $table") {
-      checkAnswer(
-        sql(s"SELECT concat(stringField, stringField) FROM $table"),
-        sql(s"SELECT stringField FROM $table").map {
-          case Row(s: String) => Row(s + s)
-        }.collect().toSeq)
-    }
   }
 
-  test("non-part select(*)") {
-    checkAnswer(
-      sql("SELECT COUNT(*) FROM normal_parquet"),
-      Row(10))
-  }
+  setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
+  run("Parquet data source enabled")
+
+  setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
+  run("Parquet data source disabled")
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to