http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
new file mode 100644
index 0000000..6b62c9a
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.json
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SQLContext
+
+trait TestJsonData {
+
+  protected def ctx: SQLContext
+
+  def primitiveFieldAndType: RDD[String] =
+    ctx.sparkContext.parallelize(
+      """{"string":"this is a simple string.",
+          "integer":10,
+          "long":21474836470,
+          "bigInteger":92233720368547758070,
+          "double":1.7976931348623157E308,
+          "boolean":true,
+          "null":null
+      }"""  :: Nil)
+
+  def primitiveFieldValueTypeConflict: RDD[String] =
+    ctx.sparkContext.parallelize(
+      """{"num_num_1":11, "num_num_2":null, "num_num_3": 1.1,
+          "num_bool":true, "num_str":13.1, "str_bool":"str1"}""" ::
+      """{"num_num_1":null, "num_num_2":21474836470.9, "num_num_3": null,
+          "num_bool":12, "num_str":null, "str_bool":true}""" ::
+      """{"num_num_1":21474836470, "num_num_2":92233720368547758070, 
"num_num_3": 100,
+          "num_bool":false, "num_str":"str1", "str_bool":false}""" ::
+      """{"num_num_1":21474836570, "num_num_2":1.1, "num_num_3": 21474836470,
+          "num_bool":null, "num_str":92233720368547758070, "str_bool":null}""" 
:: Nil)
+
+  def jsonNullStruct: RDD[String] =
+    ctx.sparkContext.parallelize(
+      
"""{"nullstr":"","ip":"27.31.100.29","headers":{"Host":"1.abc.com","Charset":"UTF-8"}}"""
 ::
+        """{"nullstr":"","ip":"27.31.100.29","headers":{}}""" ::
+        """{"nullstr":"","ip":"27.31.100.29","headers":""}""" ::
+        """{"nullstr":null,"ip":"27.31.100.29","headers":null}""" :: Nil)
+
+  def complexFieldValueTypeConflict: RDD[String] =
+    ctx.sparkContext.parallelize(
+      """{"num_struct":11, "str_array":[1, 2, 3],
+          "array":[], "struct_array":[], "struct": {}}""" ::
+      """{"num_struct":{"field":false}, "str_array":null,
+          "array":null, "struct_array":{}, "struct": null}""" ::
+      """{"num_struct":null, "str_array":"str",
+          "array":[4, 5, 6], "struct_array":[7, 8, 9], "struct": 
{"field":null}}""" ::
+      """{"num_struct":{}, "str_array":["str1", "str2", 33],
+          "array":[7], "struct_array":{"field": true}, "struct": {"field": 
"str"}}""" :: Nil)
+
+  def arrayElementTypeConflict: RDD[String] =
+    ctx.sparkContext.parallelize(
+      """{"array1": [1, 1.1, true, null, [], {}, [2,3,4], {"field":"str"}],
+          "array2": [{"field":214748364700}, {"field":1}]}""" ::
+      """{"array3": [{"field":"str"}, {"field":1}]}""" ::
+      """{"array3": [1, 2, 3]}""" :: Nil)
+
+  def missingFields: RDD[String] =
+    ctx.sparkContext.parallelize(
+      """{"a":true}""" ::
+      """{"b":21474836470}""" ::
+      """{"c":[33, 44]}""" ::
+      """{"d":{"field":true}}""" ::
+      """{"e":"str"}""" :: Nil)
+
+  def complexFieldAndType1: RDD[String] =
+    ctx.sparkContext.parallelize(
+      """{"struct":{"field1": true, "field2": 92233720368547758070},
+          "structWithArrayFields":{"field1":[4, 5, 6], "field2":["str1", 
"str2"]},
+          "arrayOfString":["str1", "str2"],
+          "arrayOfInteger":[1, 2147483647, -2147483648],
+          "arrayOfLong":[21474836470, 9223372036854775807, 
-9223372036854775808],
+          "arrayOfBigInteger":[922337203685477580700, -922337203685477580800],
+          "arrayOfDouble":[1.2, 1.7976931348623157E308, 4.9E-324, 
2.2250738585072014E-308],
+          "arrayOfBoolean":[true, false, true],
+          "arrayOfNull":[null, null, null, null],
+          "arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": 
false}, {"field3": null}],
+          "arrayOfArray1":[[1, 2, 3], ["str1", "str2"]],
+          "arrayOfArray2":[[1, 2, 3], [1.1, 2.1, 3.1]]
+         }"""  :: Nil)
+
+  def complexFieldAndType2: RDD[String] =
+    ctx.sparkContext.parallelize(
+      """{"arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": 
false}, {"field3": null}],
+          "complexArrayOfStruct": [
+          {
+            "field1": [
+            {
+              "inner1": "str1"
+            },
+            {
+              "inner2": ["str2", "str22"]
+            }],
+            "field2": [[1, 2], [3, 4]]
+          },
+          {
+            "field1": [
+            {
+              "inner2": ["str3", "str33"]
+            },
+            {
+              "inner1": "str4"
+            }],
+            "field2": [[5, 6], [7, 8]]
+          }],
+          "arrayOfArray1": [
+          [
+            [5]
+          ],
+          [
+            [6, 7],
+            [8]
+          ]],
+          "arrayOfArray2": [
+          [
+            [
+              {
+                "inner1": "str1"
+              }
+            ]
+          ],
+          [
+            [],
+            [
+              {"inner2": ["str3", "str33"]},
+              {"inner2": ["str4"], "inner1": "str11"}
+            ]
+          ],
+          [
+            [
+              {"inner3": [[{"inner4": 2}]]}
+            ]
+          ]]
+      }""" :: Nil)
+
+  def mapType1: RDD[String] =
+    ctx.sparkContext.parallelize(
+      """{"map": {"a": 1}}""" ::
+      """{"map": {"b": 2}}""" ::
+      """{"map": {"c": 3}}""" ::
+      """{"map": {"c": 1, "d": 4}}""" ::
+      """{"map": {"e": null}}""" :: Nil)
+
+  def mapType2: RDD[String] =
+    ctx.sparkContext.parallelize(
+      """{"map": {"a": {"field1": [1, 2, 3, null]}}}""" ::
+      """{"map": {"b": {"field2": 2}}}""" ::
+      """{"map": {"c": {"field1": [], "field2": 4}}}""" ::
+      """{"map": {"c": {"field2": 3}, "d": {"field1": [null]}}}""" ::
+      """{"map": {"e": null}}""" ::
+      """{"map": {"f": {"field1": null}}}""" :: Nil)
+
+  def nullsInArrays: RDD[String] =
+    ctx.sparkContext.parallelize(
+      """{"field1":[[null], [[["Test"]]]]}""" ::
+      """{"field2":[null, [{"Test":1}]]}""" ::
+      """{"field3":[[null], [{"Test":"2"}]]}""" ::
+      """{"field4":[[null, [1,2,3]]]}""" :: Nil)
+
+  def jsonArray: RDD[String] =
+    ctx.sparkContext.parallelize(
+      """[{"a":"str_a_1"}]""" ::
+      """[{"a":"str_a_2"}, {"b":"str_b_3"}]""" ::
+      """{"b":"str_b_4", "a":"str_a_4", "c":"str_c_4"}""" ::
+      """[]""" :: Nil)
+
+  def corruptRecords: RDD[String] =
+    ctx.sparkContext.parallelize(
+      """{""" ::
+      """""" ::
+      """{"a":1, b:2}""" ::
+      """{"a":{, b:3}""" ::
+      """{"b":"str_b_4", "a":"str_a_4", "c":"str_c_4"}""" ::
+      """]""" :: Nil)
+
+  def emptyRecords: RDD[String] =
+    ctx.sparkContext.parallelize(
+      """{""" ::
+        """""" ::
+        """{"a": {}}""" ::
+        """{"a": {"b": {}}}""" ::
+        """{"b": [{"c": {}}]}""" ::
+        """]""" :: Nil)
+
+  lazy val singleRow: RDD[String] =
+    ctx.sparkContext.parallelize(
+      """{"a":123}""" :: Nil)
+
+  def empty: RDD[String] = ctx.sparkContext.parallelize(Seq[String]())
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala
new file mode 100644
index 0000000..4d9c07b
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.nio.ByteBuffer
+import java.util.{List => JList, Map => JMap}
+
+import scala.collection.JavaConversions._
+
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.avro.AvroParquetWriter
+
+import org.apache.spark.sql.execution.datasources.parquet.test.avro.{Nested, 
ParquetAvroCompat}
+import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.sql.{Row, SQLContext}
+
+class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest {
+  import ParquetCompatibilityTest._
+
+  override val sqlContext: SQLContext = TestSQLContext
+
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+
+    val writer =
+      new AvroParquetWriter[ParquetAvroCompat](
+        new Path(parquetStore.getCanonicalPath),
+        ParquetAvroCompat.getClassSchema)
+
+    (0 until 10).foreach(i => writer.write(makeParquetAvroCompat(i)))
+    writer.close()
+  }
+
+  test("Read Parquet file generated by parquet-avro") {
+    logInfo(
+      s"""Schema of the Parquet file written by parquet-avro:
+         |${readParquetSchema(parquetStore.getCanonicalPath)}
+       """.stripMargin)
+
+    checkAnswer(sqlContext.read.parquet(parquetStore.getCanonicalPath), (0 
until 10).map { i =>
+      def nullable[T <: AnyRef]: ( => T) => T = makeNullable[T](i)
+
+      Row(
+        i % 2 == 0,
+        i,
+        i.toLong * 10,
+        i.toFloat + 0.1f,
+        i.toDouble + 0.2d,
+        s"val_$i".getBytes,
+        s"val_$i",
+
+        nullable(i % 2 == 0: java.lang.Boolean),
+        nullable(i: Integer),
+        nullable(i.toLong: java.lang.Long),
+        nullable(i.toFloat + 0.1f: java.lang.Float),
+        nullable(i.toDouble + 0.2d: java.lang.Double),
+        nullable(s"val_$i".getBytes),
+        nullable(s"val_$i"),
+
+        Seq.tabulate(3)(n => s"arr_${i + n}"),
+        Seq.tabulate(3)(n => n.toString -> (i + n: Integer)).toMap,
+        Seq.tabulate(3) { n =>
+          (i + n).toString -> Seq.tabulate(3) { m =>
+            Row(Seq.tabulate(3)(j => i + j + m), s"val_${i + m}")
+          }
+        }.toMap)
+    })
+  }
+
+  def makeParquetAvroCompat(i: Int): ParquetAvroCompat = {
+    def nullable[T <: AnyRef] = makeNullable[T](i) _
+
+    def makeComplexColumn(i: Int): JMap[String, JList[Nested]] = {
+      mapAsJavaMap(Seq.tabulate(3) { n =>
+        (i + n).toString -> seqAsJavaList(Seq.tabulate(3) { m =>
+          Nested
+            .newBuilder()
+            .setNestedIntsColumn(seqAsJavaList(Seq.tabulate(3)(j => i + j + 
m)))
+            .setNestedStringColumn(s"val_${i + m}")
+            .build()
+        })
+      }.toMap)
+    }
+
+    ParquetAvroCompat
+      .newBuilder()
+      .setBoolColumn(i % 2 == 0)
+      .setIntColumn(i)
+      .setLongColumn(i.toLong * 10)
+      .setFloatColumn(i.toFloat + 0.1f)
+      .setDoubleColumn(i.toDouble + 0.2d)
+      .setBinaryColumn(ByteBuffer.wrap(s"val_$i".getBytes))
+      .setStringColumn(s"val_$i")
+
+      .setMaybeBoolColumn(nullable(i % 2 == 0: java.lang.Boolean))
+      .setMaybeIntColumn(nullable(i: Integer))
+      .setMaybeLongColumn(nullable(i.toLong: java.lang.Long))
+      .setMaybeFloatColumn(nullable(i.toFloat + 0.1f: java.lang.Float))
+      .setMaybeDoubleColumn(nullable(i.toDouble + 0.2d: java.lang.Double))
+      .setMaybeBinaryColumn(nullable(ByteBuffer.wrap(s"val_$i".getBytes)))
+      .setMaybeStringColumn(nullable(s"val_$i"))
+
+      .setStringsColumn(Seq.tabulate(3)(n => s"arr_${i + n}"))
+      .setStringToIntColumn(
+        mapAsJavaMap(Seq.tabulate(3)(n => n.toString -> (i + n: 
Integer)).toMap))
+      .setComplexColumn(makeComplexColumn(i))
+
+      .build()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala
new file mode 100644
index 0000000..68f35b1
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+import java.io.File
+
+import scala.collection.JavaConversions._
+
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.hadoop.ParquetFileReader
+import org.apache.parquet.schema.MessageType
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.util.Utils
+
+abstract class ParquetCompatibilityTest extends QueryTest with ParquetTest 
with BeforeAndAfterAll {
+  protected var parquetStore: File = _
+
+  /**
+   * Optional path to a staging subdirectory which may be created during query 
processing
+   * (Hive does this).
+   * Parquet files under this directory will be ignored in 
[[readParquetSchema()]]
+   * @return an optional staging directory to ignore when scanning for parquet 
files.
+   */
+  protected def stagingDir: Option[String] = None
+
+  override protected def beforeAll(): Unit = {
+    parquetStore = Utils.createTempDir(namePrefix = "parquet-compat_")
+    parquetStore.delete()
+  }
+
+  override protected def afterAll(): Unit = {
+    Utils.deleteRecursively(parquetStore)
+  }
+
+  def readParquetSchema(path: String): MessageType = {
+    val fsPath = new Path(path)
+    val fs = fsPath.getFileSystem(configuration)
+    val parquetFiles = fs.listStatus(fsPath).toSeq.filterNot { status =>
+      status.getPath.getName.startsWith("_") ||
+        stagingDir.map(status.getPath.getName.startsWith).getOrElse(false)
+    }
+    val footers = ParquetFileReader.readAllFootersInParallel(configuration, 
parquetFiles, true)
+    footers.head.getParquetMetadata.getFileMetaData.getSchema
+  }
+}
+
+object ParquetCompatibilityTest {
+  def makeNullable[T <: AnyRef](i: Int)(f: => T): T = {
+    if (i % 3 == 0) null.asInstanceOf[T] else f
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
new file mode 100644
index 0000000..7dd9680
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.parquet.filter2.predicate.Operators._
+import org.apache.parquet.filter2.predicate.{FilterPredicate, Operators}
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.{Column, DataFrame, QueryTest, Row, SQLConf}
+
+/**
+ * A test suite that tests Parquet filter2 API based filter pushdown 
optimization.
+ *
+ * NOTE:
+ *
+ * 1. `!(a cmp b)` is always transformed to its negated form `a cmp' b` by the
+ *    `BooleanSimplification` optimization rule whenever possible. As a 
result, predicate `!(a < 1)`
+ *    results in a `GtEq` filter predicate rather than a `Not`.
+ *
+ * 2. `Tuple1(Option(x))` is used together with `AnyVal` types like `Int` to 
ensure the inferred
+ *    data type is nullable.
+ */
+class ParquetFilterSuite extends QueryTest with ParquetTest {
+  lazy val sqlContext = org.apache.spark.sql.test.TestSQLContext
+
+  private def checkFilterPredicate(
+      df: DataFrame,
+      predicate: Predicate,
+      filterClass: Class[_ <: FilterPredicate],
+      checker: (DataFrame, Seq[Row]) => Unit,
+      expected: Seq[Row]): Unit = {
+    val output = predicate.collect { case a: Attribute => a }.distinct
+
+    withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+      val query = df
+        .select(output.map(e => Column(e)): _*)
+        .where(Column(predicate))
+
+      val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect {
+        case PhysicalOperation(_, filters, LogicalRelation(_: 
ParquetRelation)) => filters
+      }.flatten.reduceOption(_ && _)
+
+      assert(maybeAnalyzedPredicate.isDefined)
+      maybeAnalyzedPredicate.foreach { pred =>
+        val maybeFilter = ParquetFilters.createFilter(pred)
+        assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for 
$pred")
+        maybeFilter.foreach { f =>
+          // Doesn't bother checking type parameters here (e.g. `Eq[Integer]`)
+          assert(f.getClass === filterClass)
+        }
+      }
+
+      checker(query, expected)
+    }
+  }
+
+  private def checkFilterPredicate
+      (predicate: Predicate, filterClass: Class[_ <: FilterPredicate], 
expected: Seq[Row])
+      (implicit df: DataFrame): Unit = {
+    checkFilterPredicate(df, predicate, filterClass, checkAnswer(_, _: 
Seq[Row]), expected)
+  }
+
+  private def checkFilterPredicate[T]
+      (predicate: Predicate, filterClass: Class[_ <: FilterPredicate], 
expected: T)
+      (implicit df: DataFrame): Unit = {
+    checkFilterPredicate(predicate, filterClass, Seq(Row(expected)))(df)
+  }
+
+  private def checkBinaryFilterPredicate
+      (predicate: Predicate, filterClass: Class[_ <: FilterPredicate], 
expected: Seq[Row])
+      (implicit df: DataFrame): Unit = {
+    def checkBinaryAnswer(df: DataFrame, expected: Seq[Row]) = {
+      assertResult(expected.map(_.getAs[Array[Byte]](0).mkString(",")).sorted) 
{
+        df.map(_.getAs[Array[Byte]](0).mkString(",")).collect().toSeq.sorted
+      }
+    }
+
+    checkFilterPredicate(df, predicate, filterClass, checkBinaryAnswer _, 
expected)
+  }
+
+  private def checkBinaryFilterPredicate
+      (predicate: Predicate, filterClass: Class[_ <: FilterPredicate], 
expected: Array[Byte])
+      (implicit df: DataFrame): Unit = {
+    checkBinaryFilterPredicate(predicate, filterClass, Seq(Row(expected)))(df)
+  }
+
+  test("filter pushdown - boolean") {
+    withParquetDataFrame((true :: false :: Nil).map(b => 
Tuple1.apply(Option(b)))) { implicit df =>
+      checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
+      checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], Seq(Row(true), 
Row(false)))
+
+      checkFilterPredicate('_1 === true, classOf[Eq[_]], true)
+      checkFilterPredicate('_1 !== true, classOf[NotEq[_]], false)
+    }
+  }
+
+  test("filter pushdown - short") {
+    withParquetDataFrame((1 to 4).map(i => Tuple1(Option(i.toShort)))) { 
implicit df =>
+      checkFilterPredicate(Cast('_1, IntegerType) === 1, classOf[Eq[_]], 1)
+      checkFilterPredicate(
+        Cast('_1, IntegerType) !== 1, classOf[NotEq[_]], (2 to 
4).map(Row.apply(_)))
+
+      checkFilterPredicate(Cast('_1, IntegerType) < 2, classOf[Lt[_]], 1)
+      checkFilterPredicate(Cast('_1, IntegerType) > 3, classOf[Gt[_]], 4)
+      checkFilterPredicate(Cast('_1, IntegerType) <= 1, classOf[LtEq[_]], 1)
+      checkFilterPredicate(Cast('_1, IntegerType) >= 4, classOf[GtEq[_]], 4)
+
+      checkFilterPredicate(Literal(1) === Cast('_1, IntegerType), 
classOf[Eq[_]], 1)
+      checkFilterPredicate(Literal(2) > Cast('_1, IntegerType), 
classOf[Lt[_]], 1)
+      checkFilterPredicate(Literal(3) < Cast('_1, IntegerType), 
classOf[Gt[_]], 4)
+      checkFilterPredicate(Literal(1) >= Cast('_1, IntegerType), 
classOf[LtEq[_]], 1)
+      checkFilterPredicate(Literal(4) <= Cast('_1, IntegerType), 
classOf[GtEq[_]], 4)
+
+      checkFilterPredicate(!(Cast('_1, IntegerType) < 4), classOf[GtEq[_]], 4)
+      checkFilterPredicate(
+        Cast('_1, IntegerType) > 2 && Cast('_1, IntegerType) < 4, 
classOf[Operators.And], 3)
+      checkFilterPredicate(
+        Cast('_1, IntegerType) < 2 || Cast('_1, IntegerType) > 3,
+        classOf[Operators.Or],
+        Seq(Row(1), Row(4)))
+    }
+  }
+
+  test("filter pushdown - integer") {
+    withParquetDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { implicit df =>
+      checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
+      checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 
4).map(Row.apply(_)))
+
+      checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1)
+      checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 
4).map(Row.apply(_)))
+
+      checkFilterPredicate('_1 < 2, classOf[Lt[_]], 1)
+      checkFilterPredicate('_1 > 3, classOf[Gt[_]], 4)
+      checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1)
+      checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
+
+      checkFilterPredicate(Literal(1) === '_1, classOf[Eq[_]], 1)
+      checkFilterPredicate(Literal(2) > '_1, classOf[Lt[_]], 1)
+      checkFilterPredicate(Literal(3) < '_1, classOf[Gt[_]], 4)
+      checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1)
+      checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4)
+
+      checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
+      checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
+      checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], 
Seq(Row(1), Row(4)))
+    }
+  }
+
+  test("filter pushdown - long") {
+    withParquetDataFrame((1 to 4).map(i => Tuple1(Option(i.toLong)))) { 
implicit df =>
+      checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
+      checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 
4).map(Row.apply(_)))
+
+      checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1)
+      checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 
4).map(Row.apply(_)))
+
+      checkFilterPredicate('_1 < 2, classOf[Lt[_]], 1)
+      checkFilterPredicate('_1 > 3, classOf[Gt[_]], 4)
+      checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1)
+      checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
+
+      checkFilterPredicate(Literal(1) === '_1, classOf[Eq[_]], 1)
+      checkFilterPredicate(Literal(2) > '_1, classOf[Lt[_]], 1)
+      checkFilterPredicate(Literal(3) < '_1, classOf[Gt[_]], 4)
+      checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1)
+      checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4)
+
+      checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
+      checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
+      checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], 
Seq(Row(1), Row(4)))
+    }
+  }
+
+  test("filter pushdown - float") {
+    withParquetDataFrame((1 to 4).map(i => Tuple1(Option(i.toFloat)))) { 
implicit df =>
+      checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
+      checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 
4).map(Row.apply(_)))
+
+      checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1)
+      checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 
4).map(Row.apply(_)))
+
+      checkFilterPredicate('_1 < 2, classOf[Lt[_]], 1)
+      checkFilterPredicate('_1 > 3, classOf[Gt[_]], 4)
+      checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1)
+      checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
+
+      checkFilterPredicate(Literal(1) === '_1, classOf[Eq[_]], 1)
+      checkFilterPredicate(Literal(2) > '_1, classOf[Lt[_]], 1)
+      checkFilterPredicate(Literal(3) < '_1, classOf[Gt[_]], 4)
+      checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1)
+      checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4)
+
+      checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
+      checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
+      checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], 
Seq(Row(1), Row(4)))
+    }
+  }
+
+  test("filter pushdown - double") {
+    withParquetDataFrame((1 to 4).map(i => Tuple1(Option(i.toDouble)))) { 
implicit df =>
+      checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
+      checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 
4).map(Row.apply(_)))
+
+      checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1)
+      checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 
4).map(Row.apply(_)))
+
+      checkFilterPredicate('_1 < 2, classOf[Lt[_]], 1)
+      checkFilterPredicate('_1 > 3, classOf[Gt[_]], 4)
+      checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1)
+      checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
+
+      checkFilterPredicate(Literal(1) === '_1, classOf[Eq[_]], 1)
+      checkFilterPredicate(Literal(2) > '_1, classOf[Lt[_]], 1)
+      checkFilterPredicate(Literal(3) < '_1, classOf[Gt[_]], 4)
+      checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1)
+      checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4)
+
+      checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
+      checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
+      checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], 
Seq(Row(1), Row(4)))
+    }
+  }
+
+  test("filter pushdown - string") {
+    withParquetDataFrame((1 to 4).map(i => Tuple1(i.toString))) { implicit df 
=>
+      checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
+      checkFilterPredicate(
+        '_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(i => 
Row.apply(i.toString)))
+
+      checkFilterPredicate('_1 === "1", classOf[Eq[_]], "1")
+      checkFilterPredicate(
+        '_1 !== "1", classOf[NotEq[_]], (2 to 4).map(i => 
Row.apply(i.toString)))
+
+      checkFilterPredicate('_1 < "2", classOf[Lt[_]], "1")
+      checkFilterPredicate('_1 > "3", classOf[Gt[_]], "4")
+      checkFilterPredicate('_1 <= "1", classOf[LtEq[_]], "1")
+      checkFilterPredicate('_1 >= "4", classOf[GtEq[_]], "4")
+
+      checkFilterPredicate(Literal("1") === '_1, classOf[Eq[_]], "1")
+      checkFilterPredicate(Literal("2") > '_1, classOf[Lt[_]], "1")
+      checkFilterPredicate(Literal("3") < '_1, classOf[Gt[_]], "4")
+      checkFilterPredicate(Literal("1") >= '_1, classOf[LtEq[_]], "1")
+      checkFilterPredicate(Literal("4") <= '_1, classOf[GtEq[_]], "4")
+
+      checkFilterPredicate(!('_1 < "4"), classOf[GtEq[_]], "4")
+      checkFilterPredicate('_1 > "2" && '_1 < "4", classOf[Operators.And], "3")
+      checkFilterPredicate('_1 < "2" || '_1 > "3", classOf[Operators.Or], 
Seq(Row("1"), Row("4")))
+    }
+  }
+
+  test("filter pushdown - binary") {
+    implicit class IntToBinary(int: Int) {
+      def b: Array[Byte] = int.toString.getBytes("UTF-8")
+    }
+
+    withParquetDataFrame((1 to 4).map(i => Tuple1(i.b))) { implicit df =>
+      checkBinaryFilterPredicate('_1 === 1.b, classOf[Eq[_]], 1.b)
+
+      checkBinaryFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
+      checkBinaryFilterPredicate(
+        '_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(i => 
Row.apply(i.b)).toSeq)
+
+      checkBinaryFilterPredicate(
+        '_1 !== 1.b, classOf[NotEq[_]], (2 to 4).map(i => 
Row.apply(i.b)).toSeq)
+
+      checkBinaryFilterPredicate('_1 < 2.b, classOf[Lt[_]], 1.b)
+      checkBinaryFilterPredicate('_1 > 3.b, classOf[Gt[_]], 4.b)
+      checkBinaryFilterPredicate('_1 <= 1.b, classOf[LtEq[_]], 1.b)
+      checkBinaryFilterPredicate('_1 >= 4.b, classOf[GtEq[_]], 4.b)
+
+      checkBinaryFilterPredicate(Literal(1.b) === '_1, classOf[Eq[_]], 1.b)
+      checkBinaryFilterPredicate(Literal(2.b) > '_1, classOf[Lt[_]], 1.b)
+      checkBinaryFilterPredicate(Literal(3.b) < '_1, classOf[Gt[_]], 4.b)
+      checkBinaryFilterPredicate(Literal(1.b) >= '_1, classOf[LtEq[_]], 1.b)
+      checkBinaryFilterPredicate(Literal(4.b) <= '_1, classOf[GtEq[_]], 4.b)
+
+      checkBinaryFilterPredicate(!('_1 < 4.b), classOf[GtEq[_]], 4.b)
+      checkBinaryFilterPredicate('_1 > 2.b && '_1 < 4.b, 
classOf[Operators.And], 3.b)
+      checkBinaryFilterPredicate(
+        '_1 < 2.b || '_1 > 3.b, classOf[Operators.Or], Seq(Row(1.b), Row(4.b)))
+    }
+  }
+
+  test("SPARK-6554: don't push down predicates which reference partition 
columns") {
+    import sqlContext.implicits._
+
+    withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+      withTempPath { dir =>
+        val path = s"${dir.getCanonicalPath}/part=1"
+        (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path)
+
+        // If the "part = 1" filter gets pushed down, this query will throw an 
exception since
+        // "part" is not a valid column in the actual Parquet file
+        checkAnswer(
+          sqlContext.read.parquet(path).filter("part = 1"),
+          (1 to 3).map(i => Row(i, i.toString, 1)))
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
new file mode 100644
index 0000000..ee925af
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
+import scala.reflect.runtime.universe.TypeTag
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
+import org.apache.parquet.example.data.simple.SimpleGroup
+import org.apache.parquet.example.data.{Group, GroupWriter}
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
+import org.apache.parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, 
ParquetMetadata}
+import org.apache.parquet.hadoop.{Footer, ParquetFileWriter, 
ParquetOutputCommitter, ParquetWriter}
+import org.apache.parquet.io.api.RecordConsumer
+import org.apache.parquet.schema.{MessageType, MessageTypeParser}
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.types._
+
+// Write support class for nested groups: ParquetWriter initializes 
GroupWriteSupport
+// with an empty configuration (it is after all not intended to be used in 
this way?)
+// and members are private so we need to make our own in order to pass the 
schema
+// to the writer.
+private[parquet] class TestGroupWriteSupport(schema: MessageType) extends 
WriteSupport[Group] {
+  var groupWriter: GroupWriter = null
+
+  override def prepareForWrite(recordConsumer: RecordConsumer): Unit = {
+    groupWriter = new GroupWriter(recordConsumer, schema)
+  }
+
+  override def init(configuration: Configuration): WriteContext = {
+    new WriteContext(schema, new java.util.HashMap[String, String]())
+  }
+
+  override def write(record: Group) {
+    groupWriter.write(record)
+  }
+}
+
+/**
+ * A test suite that tests basic Parquet I/O.
+ */
+class ParquetIOSuite extends QueryTest with ParquetTest {
+  lazy val sqlContext = org.apache.spark.sql.test.TestSQLContext
+  import sqlContext.implicits._
+
+  /**
+   * Writes `data` to a Parquet file, reads it back and check file contents.
+   */
+  protected def checkParquetFile[T <: Product : ClassTag: TypeTag](data: 
Seq[T]): Unit = {
+    withParquetDataFrame(data)(r => checkAnswer(r, data.map(Row.fromTuple)))
+  }
+
+  test("basic data types (without binary)") {
+    val data = (1 to 4).map { i =>
+      (i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble)
+    }
+    checkParquetFile(data)
+  }
+
+  test("raw binary") {
+    val data = (1 to 4).map(i => Tuple1(Array.fill(3)(i.toByte)))
+    withParquetDataFrame(data) { df =>
+      assertResult(data.map(_._1.mkString(",")).sorted) {
+        df.collect().map(_.getAs[Array[Byte]](0).mkString(",")).sorted
+      }
+    }
+  }
+
+  test("string") {
+    val data = (1 to 4).map(i => Tuple1(i.toString))
+    // Property spark.sql.parquet.binaryAsString shouldn't affect Parquet 
files written by Spark SQL
+    // as we store Spark SQL schema in the extra metadata.
+    withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> 
"false")(checkParquetFile(data))
+    withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> 
"true")(checkParquetFile(data))
+  }
+
+  test("fixed-length decimals") {
+    def makeDecimalRDD(decimal: DecimalType): DataFrame =
+      sqlContext.sparkContext
+        .parallelize(0 to 1000)
+        .map(i => Tuple1(i / 100.0))
+        .toDF()
+        // Parquet doesn't allow column names with spaces, have to add an 
alias here
+        .select($"_1" cast decimal as "dec")
+
+    for ((precision, scale) <- Seq((5, 2), (1, 0), (1, 1), (18, 10), (18, 17), 
(19, 0), (38, 37))) {
+      withTempPath { dir =>
+        val data = makeDecimalRDD(DecimalType(precision, scale))
+        data.write.parquet(dir.getCanonicalPath)
+        checkAnswer(sqlContext.read.parquet(dir.getCanonicalPath), 
data.collect().toSeq)
+      }
+    }
+  }
+
+  test("date type") {
+    def makeDateRDD(): DataFrame =
+      sqlContext.sparkContext
+        .parallelize(0 to 1000)
+        .map(i => Tuple1(DateTimeUtils.toJavaDate(i)))
+        .toDF()
+        .select($"_1")
+
+    withTempPath { dir =>
+      val data = makeDateRDD()
+      data.write.parquet(dir.getCanonicalPath)
+      checkAnswer(sqlContext.read.parquet(dir.getCanonicalPath), 
data.collect().toSeq)
+    }
+  }
+
+  test("map") {
+    val data = (1 to 4).map(i => Tuple1(Map(i -> s"val_$i")))
+    checkParquetFile(data)
+  }
+
+  test("array") {
+    val data = (1 to 4).map(i => Tuple1(Seq(i, i + 1)))
+    checkParquetFile(data)
+  }
+
+  test("array and double") {
+    val data = (1 to 4).map(i => (i.toDouble, Seq(i.toDouble, (i + 
1).toDouble)))
+    checkParquetFile(data)
+  }
+
+  test("struct") {
+    val data = (1 to 4).map(i => Tuple1((i, s"val_$i")))
+    withParquetDataFrame(data) { df =>
+      // Structs are converted to `Row`s
+      checkAnswer(df, data.map { case Tuple1(struct) =>
+        Row(Row(struct.productIterator.toSeq: _*))
+      })
+    }
+  }
+
+  test("nested struct with array of array as field") {
+    val data = (1 to 4).map(i => Tuple1((i, Seq(Seq(s"val_$i")))))
+    withParquetDataFrame(data) { df =>
+      // Structs are converted to `Row`s
+      checkAnswer(df, data.map { case Tuple1(struct) =>
+        Row(Row(struct.productIterator.toSeq: _*))
+      })
+    }
+  }
+
+  test("nested map with struct as value type") {
+    val data = (1 to 4).map(i => Tuple1(Map(i -> (i, s"val_$i"))))
+    withParquetDataFrame(data) { df =>
+      checkAnswer(df, data.map { case Tuple1(m) =>
+        Row(m.mapValues(struct => Row(struct.productIterator.toSeq: _*)))
+      })
+    }
+  }
+
+  test("nulls") {
+    val allNulls = (
+      null.asInstanceOf[java.lang.Boolean],
+      null.asInstanceOf[Integer],
+      null.asInstanceOf[java.lang.Long],
+      null.asInstanceOf[java.lang.Float],
+      null.asInstanceOf[java.lang.Double])
+
+    withParquetDataFrame(allNulls :: Nil) { df =>
+      val rows = df.collect()
+      assert(rows.length === 1)
+      assert(rows.head === Row(Seq.fill(5)(null): _*))
+    }
+  }
+
+  test("nones") {
+    val allNones = (
+      None.asInstanceOf[Option[Int]],
+      None.asInstanceOf[Option[Long]],
+      None.asInstanceOf[Option[String]])
+
+    withParquetDataFrame(allNones :: Nil) { df =>
+      val rows = df.collect()
+      assert(rows.length === 1)
+      assert(rows.head === Row(Seq.fill(3)(null): _*))
+    }
+  }
+
+  test("compression codec") {
+    def compressionCodecFor(path: String): String = {
+      val codecs = ParquetTypesConverter
+        .readMetaData(new Path(path), Some(configuration))
+        .getBlocks
+        .flatMap(_.getColumns)
+        .map(_.getCodec.name())
+        .distinct
+
+      assert(codecs.size === 1)
+      codecs.head
+    }
+
+    val data = (0 until 10).map(i => (i, i.toString))
+
+    def checkCompressionCodec(codec: CompressionCodecName): Unit = {
+      withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> codec.name()) {
+        withParquetFile(data) { path =>
+          assertResult(sqlContext.conf.parquetCompressionCodec.toUpperCase) {
+            compressionCodecFor(path)
+          }
+        }
+      }
+    }
+
+    // Checks default compression codec
+    
checkCompressionCodec(CompressionCodecName.fromConf(sqlContext.conf.parquetCompressionCodec))
+
+    checkCompressionCodec(CompressionCodecName.UNCOMPRESSED)
+    checkCompressionCodec(CompressionCodecName.GZIP)
+    checkCompressionCodec(CompressionCodecName.SNAPPY)
+  }
+
+  test("read raw Parquet file") {
+    def makeRawParquetFile(path: Path): Unit = {
+      val schema = MessageTypeParser.parseMessageType(
+        """
+          |message root {
+          |  required boolean _1;
+          |  required int32   _2;
+          |  required int64   _3;
+          |  required float   _4;
+          |  required double  _5;
+          |}
+        """.stripMargin)
+
+      val writeSupport = new TestGroupWriteSupport(schema)
+      val writer = new ParquetWriter[Group](path, writeSupport)
+
+      (0 until 10).foreach { i =>
+        val record = new SimpleGroup(schema)
+        record.add(0, i % 2 == 0)
+        record.add(1, i)
+        record.add(2, i.toLong)
+        record.add(3, i.toFloat)
+        record.add(4, i.toDouble)
+        writer.write(record)
+      }
+
+      writer.close()
+    }
+
+    withTempDir { dir =>
+      val path = new Path(dir.toURI.toString, "part-r-0.parquet")
+      makeRawParquetFile(path)
+      checkAnswer(sqlContext.read.parquet(path.toString), (0 until 10).map { i 
=>
+        Row(i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble)
+      })
+    }
+  }
+
+  test("write metadata") {
+    withTempPath { file =>
+      val path = new Path(file.toURI.toString)
+      val fs = FileSystem.getLocal(configuration)
+      val attributes = ScalaReflection.attributesFor[(Int, String)]
+      ParquetTypesConverter.writeMetaData(attributes, path, configuration)
+
+      assert(fs.exists(new Path(path, 
ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)))
+      assert(fs.exists(new Path(path, 
ParquetFileWriter.PARQUET_METADATA_FILE)))
+
+      val metaData = ParquetTypesConverter.readMetaData(path, 
Some(configuration))
+      val actualSchema = metaData.getFileMetaData.getSchema
+      val expectedSchema = 
ParquetTypesConverter.convertFromAttributes(attributes)
+
+      actualSchema.checkContains(expectedSchema)
+      expectedSchema.checkContains(actualSchema)
+    }
+  }
+
+  test("save - overwrite") {
+    withParquetFile((1 to 10).map(i => (i, i.toString))) { file =>
+      val newData = (11 to 20).map(i => (i, i.toString))
+      
newData.toDF().write.format("parquet").mode(SaveMode.Overwrite).save(file)
+      checkAnswer(sqlContext.read.parquet(file), newData.map(Row.fromTuple))
+    }
+  }
+
+  test("save - ignore") {
+    val data = (1 to 10).map(i => (i, i.toString))
+    withParquetFile(data) { file =>
+      val newData = (11 to 20).map(i => (i, i.toString))
+      newData.toDF().write.format("parquet").mode(SaveMode.Ignore).save(file)
+      checkAnswer(sqlContext.read.parquet(file), data.map(Row.fromTuple))
+    }
+  }
+
+  test("save - throw") {
+    val data = (1 to 10).map(i => (i, i.toString))
+    withParquetFile(data) { file =>
+      val newData = (11 to 20).map(i => (i, i.toString))
+      val errorMessage = intercept[Throwable] {
+        
newData.toDF().write.format("parquet").mode(SaveMode.ErrorIfExists).save(file)
+      }.getMessage
+      assert(errorMessage.contains("already exists"))
+    }
+  }
+
+  test("save - append") {
+    val data = (1 to 10).map(i => (i, i.toString))
+    withParquetFile(data) { file =>
+      val newData = (11 to 20).map(i => (i, i.toString))
+      newData.toDF().write.format("parquet").mode(SaveMode.Append).save(file)
+      checkAnswer(sqlContext.read.parquet(file), (data ++ 
newData).map(Row.fromTuple))
+    }
+  }
+
+  test("SPARK-6315 regression test") {
+    // Spark 1.1 and prior versions write Spark schema as case class string 
into Parquet metadata.
+    // This has been deprecated by JSON format since 1.2.  Notice that, 1.3 
further refactored data
+    // types API, and made StructType.fields an array.  This makes the result 
of StructType.toString
+    // different from prior versions: there's no "Seq" wrapping the fields 
part in the string now.
+    val sparkSchema =
+      
"StructType(Seq(StructField(a,BooleanType,false),StructField(b,IntegerType,false)))"
+
+    // The Parquet schema is intentionally made different from the Spark 
schema.  Because the new
+    // Parquet data source simply falls back to the Parquet schema once it 
fails to parse the Spark
+    // schema.  By making these two different, we are able to assert the old 
style case class string
+    // is parsed successfully.
+    val parquetSchema = MessageTypeParser.parseMessageType(
+      """message root {
+        |  required int32 c;
+        |}
+      """.stripMargin)
+
+    withTempPath { location =>
+      val extraMetadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> 
sparkSchema.toString)
+      val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, 
"Spark")
+      val path = new Path(location.getCanonicalPath)
+
+      ParquetFileWriter.writeMetadataFile(
+        sqlContext.sparkContext.hadoopConfiguration,
+        path,
+        new Footer(path, new ParquetMetadata(fileMetadata, Nil)) :: Nil)
+
+      assertResult(sqlContext.read.parquet(path.toString).schema) {
+        StructType(
+          StructField("a", BooleanType, nullable = false) ::
+          StructField("b", IntegerType, nullable = false) ::
+          Nil)
+      }
+    }
+  }
+
+  test("SPARK-6352 DirectParquetOutputCommitter") {
+    val clonedConf = new Configuration(configuration)
+
+    // Write to a parquet file and let it fail.
+    // _temporary should be missing if direct output committer works.
+    try {
+      configuration.set("spark.sql.parquet.output.committer.class",
+        classOf[DirectParquetOutputCommitter].getCanonicalName)
+      sqlContext.udf.register("div0", (x: Int) => x / 0)
+      withTempPath { dir =>
+        intercept[org.apache.spark.SparkException] {
+          sqlContext.sql("select div0(1)").write.parquet(dir.getCanonicalPath)
+        }
+        val path = new Path(dir.getCanonicalPath, "_temporary")
+        val fs = path.getFileSystem(configuration)
+        assert(!fs.exists(path))
+      }
+    } finally {
+      // Hadoop 1 doesn't have `Configuration.unset`
+      configuration.clear()
+      clonedConf.foreach(entry => configuration.set(entry.getKey, 
entry.getValue))
+    }
+  }
+
+  test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be 
overriden") {
+    withTempPath { dir =>
+      val clonedConf = new Configuration(configuration)
+
+      configuration.set(
+        SQLConf.OUTPUT_COMMITTER_CLASS.key, 
classOf[ParquetOutputCommitter].getCanonicalName)
+
+      configuration.set(
+        "spark.sql.parquet.output.committer.class",
+        classOf[BogusParquetOutputCommitter].getCanonicalName)
+
+      try {
+        val message = intercept[SparkException] {
+          sqlContext.range(0, 1).write.parquet(dir.getCanonicalPath)
+        }.getCause.getMessage
+        assert(message === "Intentional exception for testing purposes")
+      } finally {
+        // Hadoop 1 doesn't have `Configuration.unset`
+        configuration.clear()
+        clonedConf.foreach(entry => configuration.set(entry.getKey, 
entry.getValue))
+      }
+    }
+  }
+
+  test("SPARK-6330 regression test") {
+    // In 1.3.0, save to fs other than file: without configuring core-site.xml 
would get:
+    // IllegalArgumentException: Wrong FS: hdfs://..., expected: file:///
+    intercept[Throwable] {
+      sqlContext.read.parquet("file:///nonexistent")
+    }
+    val errorMessage = intercept[Throwable] {
+      sqlContext.read.parquet("hdfs://nonexistent")
+    }.toString
+    assert(errorMessage.contains("UnknownHostException"))
+  }
+}
+
+class BogusParquetOutputCommitter(outputPath: Path, context: 
TaskAttemptContext)
+  extends ParquetOutputCommitter(outputPath, context) {
+
+  override def commitJob(jobContext: JobContext): Unit = {
+    sys.error("Intentional exception for testing purposes")
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
new file mode 100644
index 0000000..73152de
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -0,0 +1,602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.File
+import java.math.BigInteger
+import java.sql.Timestamp
+
+import scala.collection.mutable.ArrayBuffer
+
+import com.google.common.io.Files
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.execution.datasources.{LogicalRelation, 
PartitionSpec, Partition, PartitioningUtils}
+import org.apache.spark.sql.types._
+import org.apache.spark.sql._
+import org.apache.spark.unsafe.types.UTF8String
+import PartitioningUtils._
+
+// The data where the partitioning key exists only in the directory structure.
+case class ParquetData(intField: Int, stringField: String)
+
+// The data that also includes the partitioning key
+case class ParquetDataWithKey(intField: Int, pi: Int, stringField: String, ps: 
String)
+
+class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
+
+  override lazy val sqlContext: SQLContext = 
org.apache.spark.sql.test.TestSQLContext
+  import sqlContext.implicits._
+  import sqlContext.sql
+
+  val defaultPartitionName = "__HIVE_DEFAULT_PARTITION__"
+
+  test("column type inference") {
+    def check(raw: String, literal: Literal): Unit = {
+      assert(inferPartitionColumnValue(raw, defaultPartitionName, true) === 
literal)
+    }
+
+    check("10", Literal.create(10, IntegerType))
+    check("1000000000000000", Literal.create(1000000000000000L, LongType))
+    check("1.5", Literal.create(1.5, DoubleType))
+    check("hello", Literal.create("hello", StringType))
+    check(defaultPartitionName, Literal.create(null, NullType))
+  }
+
+  test("parse partition") {
+    def check(path: String, expected: Option[PartitionValues]): Unit = {
+      assert(expected === parsePartition(new Path(path), defaultPartitionName, 
true))
+    }
+
+    def checkThrows[T <: Throwable: Manifest](path: String, expected: String): 
Unit = {
+      val message = intercept[T] {
+        parsePartition(new Path(path), defaultPartitionName, true).get
+      }.getMessage
+
+      assert(message.contains(expected))
+    }
+
+    check("file://path/a=10", Some {
+      PartitionValues(
+        ArrayBuffer("a"),
+        ArrayBuffer(Literal.create(10, IntegerType)))
+    })
+
+    check("file://path/a=10/b=hello/c=1.5", Some {
+      PartitionValues(
+        ArrayBuffer("a", "b", "c"),
+        ArrayBuffer(
+          Literal.create(10, IntegerType),
+          Literal.create("hello", StringType),
+          Literal.create(1.5, DoubleType)))
+    })
+
+    check("file://path/a=10/b_hello/c=1.5", Some {
+      PartitionValues(
+        ArrayBuffer("c"),
+        ArrayBuffer(Literal.create(1.5, DoubleType)))
+    })
+
+    check("file:///", None)
+    check("file:///path/_temporary", None)
+    check("file:///path/_temporary/c=1.5", None)
+    check("file:///path/_temporary/path", None)
+    check("file://path/a=10/_temporary/c=1.5", None)
+    check("file://path/a=10/c=1.5/_temporary", None)
+
+    checkThrows[AssertionError]("file://path/=10", "Empty partition column 
name")
+    checkThrows[AssertionError]("file://path/a=", "Empty partition column 
value")
+  }
+
+  test("parse partitions") {
+    def check(paths: Seq[String], spec: PartitionSpec): Unit = {
+      assert(parsePartitions(paths.map(new Path(_)), defaultPartitionName, 
true) === spec)
+    }
+
+    check(Seq(
+      "hdfs://host:9000/path/a=10/b=hello"),
+      PartitionSpec(
+        StructType(Seq(
+          StructField("a", IntegerType),
+          StructField("b", StringType))),
+        Seq(Partition(InternalRow(10, UTF8String.fromString("hello")),
+          "hdfs://host:9000/path/a=10/b=hello"))))
+
+    check(Seq(
+      "hdfs://host:9000/path/a=10/b=20",
+      "hdfs://host:9000/path/a=10.5/b=hello"),
+      PartitionSpec(
+        StructType(Seq(
+          StructField("a", DoubleType),
+          StructField("b", StringType))),
+        Seq(
+          Partition(InternalRow(10, UTF8String.fromString("20")),
+            "hdfs://host:9000/path/a=10/b=20"),
+          Partition(InternalRow(10.5, UTF8String.fromString("hello")),
+            "hdfs://host:9000/path/a=10.5/b=hello"))))
+
+    check(Seq(
+      "hdfs://host:9000/path/_temporary",
+      "hdfs://host:9000/path/a=10/b=20",
+      "hdfs://host:9000/path/a=10.5/b=hello",
+      "hdfs://host:9000/path/a=10.5/_temporary",
+      "hdfs://host:9000/path/a=10.5/_TeMpOrArY",
+      "hdfs://host:9000/path/a=10.5/b=hello/_temporary",
+      "hdfs://host:9000/path/a=10.5/b=hello/_TEMPORARY",
+      "hdfs://host:9000/path/_temporary/path",
+      "hdfs://host:9000/path/a=11/_temporary/path",
+      "hdfs://host:9000/path/a=10.5/b=world/_temporary/path"),
+      PartitionSpec(
+        StructType(Seq(
+          StructField("a", DoubleType),
+          StructField("b", StringType))),
+        Seq(
+          Partition(InternalRow(10, UTF8String.fromString("20")),
+            "hdfs://host:9000/path/a=10/b=20"),
+          Partition(InternalRow(10.5, UTF8String.fromString("hello")),
+            "hdfs://host:9000/path/a=10.5/b=hello"))))
+
+    check(Seq(
+      s"hdfs://host:9000/path/a=10/b=20",
+      s"hdfs://host:9000/path/a=$defaultPartitionName/b=hello"),
+      PartitionSpec(
+        StructType(Seq(
+          StructField("a", IntegerType),
+          StructField("b", StringType))),
+        Seq(
+          Partition(InternalRow(10, UTF8String.fromString("20")),
+            s"hdfs://host:9000/path/a=10/b=20"),
+          Partition(InternalRow(null, UTF8String.fromString("hello")),
+            s"hdfs://host:9000/path/a=$defaultPartitionName/b=hello"))))
+
+    check(Seq(
+      s"hdfs://host:9000/path/a=10/b=$defaultPartitionName",
+      s"hdfs://host:9000/path/a=10.5/b=$defaultPartitionName"),
+      PartitionSpec(
+        StructType(Seq(
+          StructField("a", DoubleType),
+          StructField("b", StringType))),
+        Seq(
+          Partition(InternalRow(10, null), 
s"hdfs://host:9000/path/a=10/b=$defaultPartitionName"),
+          Partition(InternalRow(10.5, null),
+            s"hdfs://host:9000/path/a=10.5/b=$defaultPartitionName"))))
+
+    check(Seq(
+      s"hdfs://host:9000/path1",
+      s"hdfs://host:9000/path2"),
+      PartitionSpec.emptySpec)
+  }
+
+  test("parse partitions with type inference disabled") {
+    def check(paths: Seq[String], spec: PartitionSpec): Unit = {
+      assert(parsePartitions(paths.map(new Path(_)), defaultPartitionName, 
false) === spec)
+    }
+
+    check(Seq(
+      "hdfs://host:9000/path/a=10/b=hello"),
+      PartitionSpec(
+        StructType(Seq(
+          StructField("a", StringType),
+          StructField("b", StringType))),
+        Seq(Partition(InternalRow(UTF8String.fromString("10"), 
UTF8String.fromString("hello")),
+          "hdfs://host:9000/path/a=10/b=hello"))))
+
+    check(Seq(
+      "hdfs://host:9000/path/a=10/b=20",
+      "hdfs://host:9000/path/a=10.5/b=hello"),
+      PartitionSpec(
+        StructType(Seq(
+          StructField("a", StringType),
+          StructField("b", StringType))),
+        Seq(
+          Partition(InternalRow(UTF8String.fromString("10"), 
UTF8String.fromString("20")),
+            "hdfs://host:9000/path/a=10/b=20"),
+          Partition(InternalRow(UTF8String.fromString("10.5"), 
UTF8String.fromString("hello")),
+            "hdfs://host:9000/path/a=10.5/b=hello"))))
+
+    check(Seq(
+      "hdfs://host:9000/path/_temporary",
+      "hdfs://host:9000/path/a=10/b=20",
+      "hdfs://host:9000/path/a=10.5/b=hello",
+      "hdfs://host:9000/path/a=10.5/_temporary",
+      "hdfs://host:9000/path/a=10.5/_TeMpOrArY",
+      "hdfs://host:9000/path/a=10.5/b=hello/_temporary",
+      "hdfs://host:9000/path/a=10.5/b=hello/_TEMPORARY",
+      "hdfs://host:9000/path/_temporary/path",
+      "hdfs://host:9000/path/a=11/_temporary/path",
+      "hdfs://host:9000/path/a=10.5/b=world/_temporary/path"),
+      PartitionSpec(
+        StructType(Seq(
+          StructField("a", StringType),
+          StructField("b", StringType))),
+        Seq(
+          Partition(InternalRow(UTF8String.fromString("10"), 
UTF8String.fromString("20")),
+            "hdfs://host:9000/path/a=10/b=20"),
+          Partition(InternalRow(UTF8String.fromString("10.5"), 
UTF8String.fromString("hello")),
+            "hdfs://host:9000/path/a=10.5/b=hello"))))
+
+    check(Seq(
+      s"hdfs://host:9000/path/a=10/b=20",
+      s"hdfs://host:9000/path/a=$defaultPartitionName/b=hello"),
+      PartitionSpec(
+        StructType(Seq(
+          StructField("a", StringType),
+          StructField("b", StringType))),
+        Seq(
+          Partition(InternalRow(UTF8String.fromString("10"), 
UTF8String.fromString("20")),
+            s"hdfs://host:9000/path/a=10/b=20"),
+          Partition(InternalRow(null, UTF8String.fromString("hello")),
+            s"hdfs://host:9000/path/a=$defaultPartitionName/b=hello"))))
+
+    check(Seq(
+      s"hdfs://host:9000/path/a=10/b=$defaultPartitionName",
+      s"hdfs://host:9000/path/a=10.5/b=$defaultPartitionName"),
+      PartitionSpec(
+        StructType(Seq(
+          StructField("a", StringType),
+          StructField("b", StringType))),
+        Seq(
+          Partition(InternalRow(UTF8String.fromString("10"), null),
+            s"hdfs://host:9000/path/a=10/b=$defaultPartitionName"),
+          Partition(InternalRow(UTF8String.fromString("10.5"), null),
+            s"hdfs://host:9000/path/a=10.5/b=$defaultPartitionName"))))
+
+    check(Seq(
+      s"hdfs://host:9000/path1",
+      s"hdfs://host:9000/path2"),
+      PartitionSpec.emptySpec)
+  }
+
+  test("read partitioned table - normal case") {
+    withTempDir { base =>
+      for {
+        pi <- Seq(1, 2)
+        ps <- Seq("foo", "bar")
+      } {
+        val dir = makePartitionDir(base, defaultPartitionName, "pi" -> pi, 
"ps" -> ps)
+        makeParquetFile(
+          (1 to 10).map(i => ParquetData(i, i.toString)),
+          dir)
+        // Introduce _temporary dir to test the robustness of the schema 
discovery process.
+        new File(dir.toString, "_temporary").mkdir()
+      }
+      // Introduce _temporary dir to the base dir the robustness of the schema 
discovery process.
+      new File(base.getCanonicalPath, "_temporary").mkdir()
+
+      sqlContext.read.parquet(base.getCanonicalPath).registerTempTable("t")
+
+      withTempTable("t") {
+        checkAnswer(
+          sql("SELECT * FROM t"),
+          for {
+            i <- 1 to 10
+            pi <- Seq(1, 2)
+            ps <- Seq("foo", "bar")
+          } yield Row(i, i.toString, pi, ps))
+
+        checkAnswer(
+          sql("SELECT intField, pi FROM t"),
+          for {
+            i <- 1 to 10
+            pi <- Seq(1, 2)
+            _ <- Seq("foo", "bar")
+          } yield Row(i, pi))
+
+        checkAnswer(
+          sql("SELECT * FROM t WHERE pi = 1"),
+          for {
+            i <- 1 to 10
+            ps <- Seq("foo", "bar")
+          } yield Row(i, i.toString, 1, ps))
+
+        checkAnswer(
+          sql("SELECT * FROM t WHERE ps = 'foo'"),
+          for {
+            i <- 1 to 10
+            pi <- Seq(1, 2)
+          } yield Row(i, i.toString, pi, "foo"))
+      }
+    }
+  }
+
+  test("read partitioned table - partition key included in Parquet file") {
+    withTempDir { base =>
+      for {
+        pi <- Seq(1, 2)
+        ps <- Seq("foo", "bar")
+      } {
+        makeParquetFile(
+          (1 to 10).map(i => ParquetDataWithKey(i, pi, i.toString, ps)),
+          makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
+      }
+
+      sqlContext.read.parquet(base.getCanonicalPath).registerTempTable("t")
+
+      withTempTable("t") {
+        checkAnswer(
+          sql("SELECT * FROM t"),
+          for {
+            i <- 1 to 10
+            pi <- Seq(1, 2)
+            ps <- Seq("foo", "bar")
+          } yield Row(i, pi, i.toString, ps))
+
+        checkAnswer(
+          sql("SELECT intField, pi FROM t"),
+          for {
+            i <- 1 to 10
+            pi <- Seq(1, 2)
+            _ <- Seq("foo", "bar")
+          } yield Row(i, pi))
+
+        checkAnswer(
+          sql("SELECT * FROM t WHERE pi = 1"),
+          for {
+            i <- 1 to 10
+            ps <- Seq("foo", "bar")
+          } yield Row(i, 1, i.toString, ps))
+
+        checkAnswer(
+          sql("SELECT * FROM t WHERE ps = 'foo'"),
+          for {
+            i <- 1 to 10
+            pi <- Seq(1, 2)
+          } yield Row(i, pi, i.toString, "foo"))
+      }
+    }
+  }
+
+  test("read partitioned table - with nulls") {
+    withTempDir { base =>
+      for {
+        // Must be `Integer` rather than `Int` here. `null.asInstanceOf[Int]` 
results in a zero...
+        pi <- Seq(1, null.asInstanceOf[Integer])
+        ps <- Seq("foo", null.asInstanceOf[String])
+      } {
+        makeParquetFile(
+          (1 to 10).map(i => ParquetData(i, i.toString)),
+          makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
+      }
+
+      val parquetRelation = 
sqlContext.read.format("parquet").load(base.getCanonicalPath)
+      parquetRelation.registerTempTable("t")
+
+      withTempTable("t") {
+        checkAnswer(
+          sql("SELECT * FROM t"),
+          for {
+            i <- 1 to 10
+            pi <- Seq(1, null.asInstanceOf[Integer])
+            ps <- Seq("foo", null.asInstanceOf[String])
+          } yield Row(i, i.toString, pi, ps))
+
+        checkAnswer(
+          sql("SELECT * FROM t WHERE pi IS NULL"),
+          for {
+            i <- 1 to 10
+            ps <- Seq("foo", null.asInstanceOf[String])
+          } yield Row(i, i.toString, null, ps))
+
+        checkAnswer(
+          sql("SELECT * FROM t WHERE ps IS NULL"),
+          for {
+            i <- 1 to 10
+            pi <- Seq(1, null.asInstanceOf[Integer])
+          } yield Row(i, i.toString, pi, null))
+      }
+    }
+  }
+
+  test("read partitioned table - with nulls and partition keys are included in 
Parquet file") {
+    withTempDir { base =>
+      for {
+        pi <- Seq(1, 2)
+        ps <- Seq("foo", null.asInstanceOf[String])
+      } {
+        makeParquetFile(
+          (1 to 10).map(i => ParquetDataWithKey(i, pi, i.toString, ps)),
+          makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
+      }
+
+      val parquetRelation = 
sqlContext.read.format("parquet").load(base.getCanonicalPath)
+      parquetRelation.registerTempTable("t")
+
+      withTempTable("t") {
+        checkAnswer(
+          sql("SELECT * FROM t"),
+          for {
+            i <- 1 to 10
+            pi <- Seq(1, 2)
+            ps <- Seq("foo", null.asInstanceOf[String])
+          } yield Row(i, pi, i.toString, ps))
+
+        checkAnswer(
+          sql("SELECT * FROM t WHERE ps IS NULL"),
+          for {
+            i <- 1 to 10
+            pi <- Seq(1, 2)
+          } yield Row(i, pi, i.toString, null))
+      }
+    }
+  }
+
+  test("read partitioned table - merging compatible schemas") {
+    withTempDir { base =>
+      makeParquetFile(
+        (1 to 10).map(i => Tuple1(i)).toDF("intField"),
+        makePartitionDir(base, defaultPartitionName, "pi" -> 1))
+
+      makeParquetFile(
+        (1 to 10).map(i => (i, i.toString)).toDF("intField", "stringField"),
+        makePartitionDir(base, defaultPartitionName, "pi" -> 2))
+
+      sqlContext
+        .read
+        .option("mergeSchema", "true")
+        .format("parquet")
+        .load(base.getCanonicalPath)
+        .registerTempTable("t")
+
+      withTempTable("t") {
+        checkAnswer(
+          sql("SELECT * FROM t"),
+          (1 to 10).map(i => Row(i, null, 1)) ++ (1 to 10).map(i => Row(i, 
i.toString, 2)))
+      }
+    }
+  }
+
+  test("SPARK-7749 Non-partitioned table should have empty partition spec") {
+    withTempPath { dir =>
+      (1 to 10).map(i => (i, i.toString)).toDF("a", 
"b").write.parquet(dir.getCanonicalPath)
+      val queryExecution = 
sqlContext.read.parquet(dir.getCanonicalPath).queryExecution
+      queryExecution.analyzed.collectFirst {
+        case LogicalRelation(relation: ParquetRelation) =>
+          assert(relation.partitionSpec === PartitionSpec.emptySpec)
+      }.getOrElse {
+        fail(s"Expecting a ParquetRelation2, but got:\n$queryExecution")
+      }
+    }
+  }
+
+  test("SPARK-7847: Dynamic partition directory path escaping and unescaping") 
{
+    withTempPath { dir =>
+      val df = Seq("/", "[]", "?").zipWithIndex.map(_.swap).toDF("i", "s")
+      df.write.format("parquet").partitionBy("s").save(dir.getCanonicalPath)
+      checkAnswer(sqlContext.read.parquet(dir.getCanonicalPath), df.collect())
+    }
+  }
+
+  test("Various partition value types") {
+    val row =
+      Row(
+        100.toByte,
+        40000.toShort,
+        Int.MaxValue,
+        Long.MaxValue,
+        1.5.toFloat,
+        4.5,
+        new java.math.BigDecimal(new BigInteger("212500"), 5),
+        new java.math.BigDecimal(2.125),
+        java.sql.Date.valueOf("2015-05-23"),
+        new Timestamp(0),
+        "This is a string, /[]?=:",
+        "This is not a partition column")
+
+    // BooleanType is not supported yet
+    val partitionColumnTypes =
+      Seq(
+        ByteType,
+        ShortType,
+        IntegerType,
+        LongType,
+        FloatType,
+        DoubleType,
+        DecimalType(10, 5),
+        DecimalType.SYSTEM_DEFAULT,
+        DateType,
+        TimestampType,
+        StringType)
+
+    val partitionColumns = partitionColumnTypes.zipWithIndex.map {
+      case (t, index) => StructField(s"p_$index", t)
+    }
+
+    val schema = StructType(partitionColumns :+ StructField(s"i", StringType))
+    val df = 
sqlContext.createDataFrame(sqlContext.sparkContext.parallelize(row :: Nil), 
schema)
+
+    withTempPath { dir =>
+      df.write.format("parquet").partitionBy(partitionColumns.map(_.name): 
_*).save(dir.toString)
+      val fields = schema.map(f => Column(f.name).cast(f.dataType))
+      checkAnswer(sqlContext.read.load(dir.toString).select(fields: _*), row)
+    }
+  }
+
+  test("SPARK-8037: Ignores files whose name starts with dot") {
+    withTempPath { dir =>
+      val df = (1 to 3).map(i => (i, i, i, i)).toDF("a", "b", "c", "d")
+
+      df.write
+        .format("parquet")
+        .partitionBy("b", "c", "d")
+        .save(dir.getCanonicalPath)
+
+      Files.touch(new File(s"${dir.getCanonicalPath}/b=1", ".DS_Store"))
+      Files.createParentDirs(new 
File(s"${dir.getCanonicalPath}/b=1/c=1/.foo/bar"))
+
+      
checkAnswer(sqlContext.read.format("parquet").load(dir.getCanonicalPath), df)
+    }
+  }
+
+  test("listConflictingPartitionColumns") {
+    def makeExpectedMessage(colNameLists: Seq[String], paths: Seq[String]): 
String = {
+      val conflictingColNameLists = colNameLists.zipWithIndex.map { case 
(list, index) =>
+        s"\tPartition column name list #$index: $list"
+      }.mkString("\n", "\n", "\n")
+
+      // scalastyle:off
+      s"""Conflicting partition column names detected:
+         |$conflictingColNameLists
+         |For partitioned table directories, data files should only live in 
leaf directories.
+         |And directories at the same level should have the same partition 
column name.
+         |Please check the following directories for unexpected files or 
inconsistent partition column names:
+         |${paths.map("\t" + _).mkString("\n", "\n", "")}
+       """.stripMargin.trim
+      // scalastyle:on
+    }
+
+    assert(
+      listConflictingPartitionColumns(
+        Seq(
+          (new Path("file:/tmp/foo/a=1"), PartitionValues(Seq("a"), 
Seq(Literal(1)))),
+          (new Path("file:/tmp/foo/b=1"), PartitionValues(Seq("b"), 
Seq(Literal(1)))))).trim ===
+        makeExpectedMessage(Seq("a", "b"), Seq("file:/tmp/foo/a=1", 
"file:/tmp/foo/b=1")))
+
+    assert(
+      listConflictingPartitionColumns(
+        Seq(
+          (new Path("file:/tmp/foo/a=1/_temporary"), PartitionValues(Seq("a"), 
Seq(Literal(1)))),
+          (new Path("file:/tmp/foo/a=1"), PartitionValues(Seq("a"), 
Seq(Literal(1)))))).trim ===
+        makeExpectedMessage(
+          Seq("a"),
+          Seq("file:/tmp/foo/a=1/_temporary", "file:/tmp/foo/a=1")))
+
+    assert(
+      listConflictingPartitionColumns(
+        Seq(
+          (new Path("file:/tmp/foo/a=1"),
+            PartitionValues(Seq("a"), Seq(Literal(1)))),
+          (new Path("file:/tmp/foo/a=1/b=foo"),
+            PartitionValues(Seq("a", "b"), Seq(Literal(1), 
Literal("foo")))))).trim ===
+        makeExpectedMessage(
+          Seq("a", "a, b"),
+          Seq("file:/tmp/foo/a=1", "file:/tmp/foo/a=1/b=foo")))
+  }
+
+  test("Parallel partition discovery") {
+    withTempPath { dir =>
+      withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "1") {
+        val path = dir.getCanonicalPath
+        val df = sqlContext.range(5).select('id as 'a, 'id as 'b, 'id as 
'c).coalesce(1)
+        df.write.partitionBy("b", "c").parquet(path)
+        checkAnswer(sqlContext.read.parquet(path), df)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
new file mode 100644
index 0000000..5e6d9c1
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.File
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.{QueryTest, Row, SQLConf}
+import org.apache.spark.util.Utils
+
+/**
+ * A test suite that tests various Parquet queries.
+ */
+class ParquetQuerySuite extends QueryTest with ParquetTest {
+  lazy val sqlContext = org.apache.spark.sql.test.TestSQLContext
+  import sqlContext.sql
+
+  test("simple select queries") {
+    withParquetTable((0 until 10).map(i => (i, i.toString)), "t") {
+      checkAnswer(sql("SELECT _1 FROM t where t._1 > 5"), (6 until 
10).map(Row.apply(_)))
+      checkAnswer(sql("SELECT _1 FROM t as tmp where tmp._1 < 5"), (0 until 
5).map(Row.apply(_)))
+    }
+  }
+
+  test("appending") {
+    val data = (0 until 10).map(i => (i, i.toString))
+    sqlContext.createDataFrame(data).toDF("c1", "c2").registerTempTable("tmp")
+    withParquetTable(data, "t") {
+      sql("INSERT INTO TABLE t SELECT * FROM tmp")
+      checkAnswer(sqlContext.table("t"), (data ++ data).map(Row.fromTuple))
+    }
+    sqlContext.catalog.unregisterTable(Seq("tmp"))
+  }
+
+  test("overwriting") {
+    val data = (0 until 10).map(i => (i, i.toString))
+    sqlContext.createDataFrame(data).toDF("c1", "c2").registerTempTable("tmp")
+    withParquetTable(data, "t") {
+      sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp")
+      checkAnswer(sqlContext.table("t"), data.map(Row.fromTuple))
+    }
+    sqlContext.catalog.unregisterTable(Seq("tmp"))
+  }
+
+  test("self-join") {
+    // 4 rows, cells of column 1 of row 2 and row 4 are null
+    val data = (1 to 4).map { i =>
+      val maybeInt = if (i % 2 == 0) None else Some(i)
+      (maybeInt, i.toString)
+    }
+
+    withParquetTable(data, "t") {
+      val selfJoin = sql("SELECT * FROM t x JOIN t y WHERE x._1 = y._1")
+      val queryOutput = selfJoin.queryExecution.analyzed.output
+
+      assertResult(4, "Field count mismatches")(queryOutput.size)
+      assertResult(2, "Duplicated expression ID in query plan:\n $selfJoin") {
+        queryOutput.filter(_.name == "_1").map(_.exprId).size
+      }
+
+      checkAnswer(selfJoin, List(Row(1, "1", 1, "1"), Row(3, "3", 3, "3")))
+    }
+  }
+
+  test("nested data - struct with array field") {
+    val data = (1 to 10).map(i => Tuple1((i, Seq("val_$i"))))
+    withParquetTable(data, "t") {
+      checkAnswer(sql("SELECT _1._2[0] FROM t"), data.map {
+        case Tuple1((_, Seq(string))) => Row(string)
+      })
+    }
+  }
+
+  test("nested data - array of struct") {
+    val data = (1 to 10).map(i => Tuple1(Seq(i -> "val_$i")))
+    withParquetTable(data, "t") {
+      checkAnswer(sql("SELECT _1[0]._2 FROM t"), data.map {
+        case Tuple1(Seq((_, string))) => Row(string)
+      })
+    }
+  }
+
+  test("SPARK-1913 regression: columns only referenced by pushed down filters 
should remain") {
+    withParquetTable((1 to 10).map(Tuple1.apply), "t") {
+      checkAnswer(sql("SELECT _1 FROM t WHERE _1 < 10"), (1 to 
9).map(Row.apply(_)))
+    }
+  }
+
+  test("SPARK-5309 strings stored using dictionary compression in parquet") {
+    withParquetTable((0 until 1000).map(i => ("same", "run_" + i /100, 1)), 
"t") {
+
+      checkAnswer(sql("SELECT _1, _2, SUM(_3) FROM t GROUP BY _1, _2"),
+        (0 until 10).map(i => Row("same", "run_" + i, 100)))
+
+      checkAnswer(sql("SELECT _1, _2, SUM(_3) FROM t WHERE _2 = 'run_5' GROUP 
BY _1, _2"),
+        List(Row("same", "run_5", 100)))
+    }
+  }
+
+  test("SPARK-6917 DecimalType should work with non-native types") {
+    val data = (1 to 10).map(i => Row(Decimal(i, 18, 0), new 
java.sql.Timestamp(i)))
+    val schema = StructType(List(StructField("d", DecimalType(18, 0), false),
+      StructField("time", TimestampType, false)).toArray)
+    withTempPath { file =>
+      val df = 
sqlContext.createDataFrame(sqlContext.sparkContext.parallelize(data), schema)
+      df.write.parquet(file.getCanonicalPath)
+      val df2 = sqlContext.read.parquet(file.getCanonicalPath)
+      checkAnswer(df2, df.collect().toSeq)
+    }
+  }
+
+  test("Enabling/disabling merging partfiles when merging parquet schema") {
+    def testSchemaMerging(expectedColumnNumber: Int): Unit = {
+      withTempDir { dir =>
+        val basePath = dir.getCanonicalPath
+        sqlContext.range(0, 10).toDF("a").write.parquet(new Path(basePath, 
"foo=1").toString)
+        sqlContext.range(0, 10).toDF("b").write.parquet(new Path(basePath, 
"foo=2").toString)
+        // delete summary files, so if we don't merge part-files, one column 
will not be included.
+        Utils.deleteRecursively(new File(basePath + "/foo=1/_metadata"))
+        Utils.deleteRecursively(new File(basePath + "/foo=1/_common_metadata"))
+        assert(sqlContext.read.parquet(basePath).columns.length === 
expectedColumnNumber)
+      }
+    }
+
+    withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
+      SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "true") {
+      testSchemaMerging(2)
+    }
+
+    withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
+      SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "false") {
+      testSchemaMerging(3)
+    }
+  }
+
+  test("Enabling/disabling schema merging") {
+    def testSchemaMerging(expectedColumnNumber: Int): Unit = {
+      withTempDir { dir =>
+        val basePath = dir.getCanonicalPath
+        sqlContext.range(0, 10).toDF("a").write.parquet(new Path(basePath, 
"foo=1").toString)
+        sqlContext.range(0, 10).toDF("b").write.parquet(new Path(basePath, 
"foo=2").toString)
+        assert(sqlContext.read.parquet(basePath).columns.length === 
expectedColumnNumber)
+      }
+    }
+
+    withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") {
+      testSchemaMerging(3)
+    }
+
+    withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "false") {
+      testSchemaMerging(2)
+    }
+  }
+
+  test("SPARK-8990 DataFrameReader.parquet() should respect user specified 
options") {
+    withTempPath { dir =>
+      val basePath = dir.getCanonicalPath
+      sqlContext.range(0, 10).toDF("a").write.parquet(new Path(basePath, 
"foo=1").toString)
+      sqlContext.range(0, 10).toDF("b").write.parquet(new Path(basePath, 
"foo=a").toString)
+
+      // Disables the global SQL option for schema merging
+      withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "false") {
+        assertResult(2) {
+          // Disables schema merging via data source option
+          sqlContext.read.option("mergeSchema", 
"false").parquet(basePath).columns.length
+        }
+
+        assertResult(3) {
+          // Enables schema merging via data source option
+          sqlContext.read.option("mergeSchema", 
"true").parquet(basePath).columns.length
+        }
+      }
+    }
+  }
+
+  test("SPARK-9119 Decimal should be correctly written into parquet") {
+    withTempPath { dir =>
+      val basePath = dir.getCanonicalPath
+      val schema = StructType(Array(StructField("name", DecimalType(10, 5), 
false)))
+      val rowRDD = 
sqlContext.sparkContext.parallelize(Array(Row(Decimal("67123.45"))))
+      val df = sqlContext.createDataFrame(rowRDD, schema)
+      df.write.parquet(basePath)
+
+      val decimal = sqlContext.read.parquet(basePath).first().getDecimal(0)
+      assert(Decimal("67123.45") === Decimal(decimal))
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to