[SPARK-5310][SQL] Fixes to Docs and Datasources API

 - Various Fixes to docs
 - Make data source traits actually interfaces

Based on #4862 but with fixed conflicts.

Author: Reynold Xin <r...@databricks.com>
Author: Michael Armbrust <mich...@databricks.com>

Closes #4868 from marmbrus/pr/4862 and squashes the following commits:

fe091ea [Michael Armbrust] Merge remote-tracking branch 'origin/master' into 
pr/4862
0208497 [Reynold Xin] Test fixes.
34e0a28 [Reynold Xin] [SPARK-5310][SQL] Various fixes to Spark SQL docs.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/54d19689
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/54d19689
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/54d19689

Branch: refs/heads/master
Commit: 54d19689ff8d786acde5b8ada6741854ffadadea
Parents: 1259994
Author: Reynold Xin <r...@databricks.com>
Authored: Mon Mar 2 22:14:08 2015 -0800
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Mon Mar 2 22:14:08 2015 -0800

----------------------------------------------------------------------
 project/SparkBuild.scala                        |  29 +-
 .../scala/org/apache/spark/sql/DataFrame.scala  |  36 +-
 .../scala/org/apache/spark/sql/RDDApi.scala     |   4 +-
 .../apache/spark/sql/jdbc/JDBCRelation.scala    |   3 +-
 .../apache/spark/sql/json/JSONRelation.scala    |   5 +-
 .../apache/spark/sql/parquet/newParquet.scala   |   3 +-
 .../apache/spark/sql/sources/interfaces.scala   |  43 +-
 .../apache/spark/sql/sources/DDLTestSuite.scala |   2 +-
 .../spark/sql/sources/FilteredScanSuite.scala   |   3 +-
 .../spark/sql/sources/PrunedScanSuite.scala     |   3 +-
 .../spark/sql/sources/TableScanSuite.scala      |  11 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |   3 +-
 .../hive/execution/CreateTableAsSelect.scala    |   3 +-
 .../execution/DescribeHiveTableCommand.scala    |   4 +-
 .../sql/hive/execution/HiveNativeCommand.scala  |   6 +-
 .../sql/hive/execution/HiveTableScan.scala      |   4 +-
 .../hive/execution/InsertIntoHiveTable.scala    |   6 +-
 .../hive/execution/ScriptTransformation.scala   |  15 +-
 .../spark/sql/hive/execution/commands.scala     |  27 +-
 .../spark/sql/hive/execution/package.scala      |  25 -
 .../spark/sql/hive/HiveParquetSuite.scala       |  92 +++
 .../apache/spark/sql/hive/parquetSuites.scala   | 767 +++++++++++++++++++
 .../spark/sql/parquet/HiveParquetSuite.scala    |  91 ---
 .../spark/sql/parquet/parquetSuites.scala       | 766 ------------------
 24 files changed, 965 insertions(+), 986 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index e4b1b96..4f17df5 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -357,6 +357,21 @@ object Unidoc {
     names.map(s => "org.apache.spark." + s).mkString(":")
   }
 
+  private def ignoreUndocumentedPackages(packages: Seq[Seq[File]]): 
Seq[Seq[File]] = {
+    packages
+      .map(_.filterNot(_.getName.contains("$")))
+      .map(_.filterNot(_.getCanonicalPath.contains("akka")))
+      .map(_.filterNot(_.getCanonicalPath.contains("deploy")))
+      .map(_.filterNot(_.getCanonicalPath.contains("network")))
+      .map(_.filterNot(_.getCanonicalPath.contains("shuffle")))
+      .map(_.filterNot(_.getCanonicalPath.contains("executor")))
+      .map(_.filterNot(_.getCanonicalPath.contains("python")))
+      .map(_.filterNot(_.getCanonicalPath.contains("collection")))
+      .map(_.filterNot(_.getCanonicalPath.contains("sql/catalyst")))
+      .map(_.filterNot(_.getCanonicalPath.contains("sql/execution")))
+      .map(_.filterNot(_.getCanonicalPath.contains("sql/hive/test")))
+  }
+
   lazy val settings = scalaJavaUnidocSettings ++ Seq (
     publish := {},
 
@@ -368,22 +383,12 @@ object Unidoc {
     // Skip actual catalyst, but include the subproject.
     // Catalyst is not public API and contains quasiquotes which break 
scaladoc.
     unidocAllSources in (ScalaUnidoc, unidoc) := {
-      (unidocAllSources in (ScalaUnidoc, unidoc)).value
-        .map(_.filterNot(_.getCanonicalPath.contains("sql/catalyst")))
+      ignoreUndocumentedPackages((unidocAllSources in (ScalaUnidoc, 
unidoc)).value)
     },
 
     // Skip class names containing $ and some internal packages in Javadocs
     unidocAllSources in (JavaUnidoc, unidoc) := {
-      (unidocAllSources in (JavaUnidoc, unidoc)).value
-        .map(_.filterNot(_.getName.contains("$")))
-        .map(_.filterNot(_.getCanonicalPath.contains("akka")))
-        .map(_.filterNot(_.getCanonicalPath.contains("deploy")))
-        .map(_.filterNot(_.getCanonicalPath.contains("network")))
-        .map(_.filterNot(_.getCanonicalPath.contains("shuffle")))
-        .map(_.filterNot(_.getCanonicalPath.contains("executor")))
-        .map(_.filterNot(_.getCanonicalPath.contains("python")))
-        .map(_.filterNot(_.getCanonicalPath.contains("collection")))
-        .map(_.filterNot(_.getCanonicalPath.contains("sql/catalyst")))
+      ignoreUndocumentedPackages((unidocAllSources in (JavaUnidoc, 
unidoc)).value)
     },
 
     // Javadoc options: create a window title, and group key packages on index 
page

http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index f3aac08..46f5070 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -64,7 +64,7 @@ private[sql] object DataFrame {
  *   val people = sqlContext.parquetFile("...")
  *
  *   // Create a DataFrame from data sources
- *   val df =
+ *   val df = sqlContext.load("...", "json")
  * }}}
  *
  * Once created, it can be manipulated using the various 
domain-specific-language (DSL) functions
@@ -80,9 +80,10 @@ private[sql] object DataFrame {
  * {{{
  *   // The following creates a new column that increases everybody's age by 
10.
  *   people("age") + 10  // in Scala
+ *   people.col("age").plus(10);  // in Java
  * }}}
  *
- * A more concrete example:
+ * A more concrete example in Scala:
  * {{{
  *   // To create DataFrame using SQLContext
  *   val people = sqlContext.parquetFile("...")
@@ -94,6 +95,18 @@ private[sql] object DataFrame {
  *     .agg(avg(people("salary")), max(people("age")))
  * }}}
  *
+ * and in Java:
+ * {{{
+ *   // To create DataFrame using SQLContext
+ *   DataFrame people = sqlContext.parquetFile("...");
+ *   DataFrame department = sqlContext.parquetFile("...");
+ *
+ *   people.filter("age".gt(30))
+ *     .join(department, people.col("deptId").equalTo(department("id")))
+ *     .groupBy(department.col("name"), "gender")
+ *     .agg(avg(people.col("salary")), max(people.col("age")));
+ * }}}
+ *
  * @groupname basic Basic DataFrame functions
  * @groupname dfops Language Integrated Queries
  * @groupname rdd RDD Operations
@@ -102,7 +115,7 @@ private[sql] object DataFrame {
  */
 // TODO: Improve documentation.
 @Experimental
-class DataFrame protected[sql](
+class DataFrame private[sql](
     @transient val sqlContext: SQLContext,
     @DeveloperApi @transient val queryExecution: SQLContext#QueryExecution)
   extends RDDApi[Row] with Serializable {
@@ -295,12 +308,14 @@ class DataFrame protected[sql](
    *   1984  04    0.450090        0.483521
    * }}}
    * @param numRows Number of rows to show
-   * @group basic
+   *
+   * @group action
    */
   def show(numRows: Int): Unit = println(showString(numRows))
 
   /**
    * Displays the top 20 rows of [[DataFrame]] in a tabular form.
+   * @group action
    */
   def show(): Unit = show(20)
 
@@ -738,16 +753,19 @@ class DataFrame protected[sql](
 
   /**
    * Returns the first `n` rows.
+   * @group action
    */
   def head(n: Int): Array[Row] = limit(n).collect()
 
   /**
    * Returns the first row.
+   * @group action
    */
   def head(): Row = head(1).head
 
   /**
    * Returns the first row. Alias for head().
+   * @group action
    */
   override def first(): Row = head()
 
@@ -834,6 +852,11 @@ class DataFrame protected[sql](
   /**
    * @group basic
    */
+  override def cache(): this.type = persist()
+
+  /**
+   * @group basic
+   */
   override def persist(newLevel: StorageLevel): this.type = {
     sqlContext.cacheManager.cacheQuery(this, None, newLevel)
     this
@@ -847,6 +870,11 @@ class DataFrame protected[sql](
     this
   }
 
+  /**
+   * @group basic
+   */
+  override def unpersist(): this.type = unpersist(blocking = false)
+
   /////////////////////////////////////////////////////////////////////////////
   // I/O
   /////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala
index df866fd..ba4373f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala
@@ -29,13 +29,13 @@ import org.apache.spark.storage.StorageLevel
  */
 private[sql] trait RDDApi[T] {
 
-  def cache(): this.type = persist()
+  def cache(): this.type
 
   def persist(): this.type
 
   def persist(newLevel: StorageLevel): this.type
 
-  def unpersist(): this.type = unpersist(blocking = false)
+  def unpersist(): this.type
 
   def unpersist(blocking: Boolean): this.type
 

http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
index beb76f2..1778d39 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
@@ -119,7 +119,8 @@ private[sql] case class JDBCRelation(
     url: String,
     table: String,
     parts: Array[Partition])(@transient val sqlContext: SQLContext)
-  extends PrunedFilteredScan {
+  extends BaseRelation
+  with PrunedFilteredScan {
 
   override val schema = JDBCRDD.resolveTable(url, table)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
index f9d0ba2..b645199 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
@@ -90,7 +90,10 @@ private[sql] case class JSONRelation(
     samplingRatio: Double,
     userSpecifiedSchema: Option[StructType])(
     @transient val sqlContext: SQLContext)
-  extends TableScan with InsertableRelation {
+  extends BaseRelation
+  with TableScan
+  with InsertableRelation {
+
   // TODO: Support partitioned JSON relation.
   private def baseRDD = sqlContext.sparkContext.textFile(path)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 8d95858..234e6bb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -159,7 +159,8 @@ private[sql] case class ParquetRelation2(
     maybeSchema: Option[StructType] = None,
     maybePartitionSpec: Option[PartitionSpec] = None)(
     @transient val sqlContext: SQLContext)
-  extends CatalystScan
+  extends BaseRelation
+  with CatalystScan
   with InsertableRelation
   with SparkHadoopMapReduceUtil
   with Logging {

http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 0c4b706..a046a48 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.spark.sql.sources
 
 import org.apache.spark.annotation.{Experimental, DeveloperApi}
@@ -90,12 +91,6 @@ trait CreatableRelationProvider {
     * existing data is expected to be overwritten by the contents of the 
DataFrame.
     * ErrorIfExists mode means that when saving a DataFrame to a data source,
     * if data already exists, an exception is expected to be thrown.
-    *
-    * @param sqlContext
-    * @param mode
-    * @param parameters
-    * @param data
-    * @return
     */
   def createRelation(
       sqlContext: SQLContext,
@@ -138,7 +133,7 @@ abstract class BaseRelation {
  * A BaseRelation that can produce all of its tuples as an RDD of Row objects.
  */
 @DeveloperApi
-trait TableScan extends BaseRelation {
+trait TableScan {
   def buildScan(): RDD[Row]
 }
 
@@ -148,7 +143,7 @@ trait TableScan extends BaseRelation {
  * containing all of its tuples as Row objects.
  */
 @DeveloperApi
-trait PrunedScan extends BaseRelation {
+trait PrunedScan {
   def buildScan(requiredColumns: Array[String]): RDD[Row]
 }
 
@@ -162,25 +157,11 @@ trait PrunedScan extends BaseRelation {
  * as filtering partitions based on a bloom filter.
  */
 @DeveloperApi
-trait PrunedFilteredScan extends BaseRelation {
+trait PrunedFilteredScan {
   def buildScan(requiredColumns: Array[String], filters: Array[Filter]): 
RDD[Row]
 }
 
 /**
- * ::Experimental::
- * An interface for experimenting with a more direct connection to the query 
planner.  Compared to
- * [[PrunedFilteredScan]], this operator receives the raw expressions from the
- * [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]].  Unlike the 
other APIs this
- * interface is not designed to be binary compatible across releases and thus 
should only be used
- * for experimentation.
- */
-@Experimental
-trait CatalystScan extends BaseRelation {
-  def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): 
RDD[Row]
-}
-
-@DeveloperApi
-/**
  * ::DeveloperApi::
  * A BaseRelation that can be used to insert data into it through the insert 
method.
  * If overwrite in insert method is true, the old data in the relation should 
be overwritten with
@@ -196,6 +177,20 @@ trait CatalystScan extends BaseRelation {
  * If a data source needs to check the actual nullability of a field, it needs 
to do it in the
  * insert method.
  */
-trait InsertableRelation extends BaseRelation {
+@DeveloperApi
+trait InsertableRelation {
   def insert(data: DataFrame, overwrite: Boolean): Unit
 }
+
+/**
+ * ::Experimental::
+ * An interface for experimenting with a more direct connection to the query 
planner.  Compared to
+ * [[PrunedFilteredScan]], this operator receives the raw expressions from the
+ * [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]].  Unlike the 
other APIs this
+ * interface is NOT designed to be binary compatible across releases and thus 
should only be used
+ * for experimentation.
+ */
+@Experimental
+trait CatalystScan {
+  def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): 
RDD[Row]
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala
index 0ec756b..54af50c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala
@@ -29,7 +29,7 @@ class DDLScanSource extends RelationProvider {
 }
 
 case class SimpleDDLScan(from: Int, to: Int)(@transient val sqlContext: 
SQLContext)
-  extends TableScan {
+  extends BaseRelation with TableScan {
 
   override def schema =
     StructType(Seq(

http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
index 41cd356..ffeccf0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
@@ -32,7 +32,8 @@ class FilteredScanSource extends RelationProvider {
 }
 
 case class SimpleFilteredScan(from: Int, to: Int)(@transient val sqlContext: 
SQLContext)
-  extends PrunedFilteredScan {
+  extends BaseRelation
+  with PrunedFilteredScan {
 
   override def schema =
     StructType(

http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala
index a33cf11..08fb538 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala
@@ -31,7 +31,8 @@ class PrunedScanSource extends RelationProvider {
 }
 
 case class SimplePrunedScan(from: Int, to: Int)(@transient val sqlContext: 
SQLContext)
-  extends PrunedScan {
+  extends BaseRelation
+  with PrunedScan {
 
   override def schema =
     StructType(

http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
index 0a4d4b6..7928600 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
@@ -33,7 +33,7 @@ class SimpleScanSource extends RelationProvider {
 }
 
 case class SimpleScan(from: Int, to: Int)(@transient val sqlContext: 
SQLContext)
-  extends TableScan {
+  extends BaseRelation with TableScan {
 
   override def schema =
     StructType(StructField("i", IntegerType, nullable = false) :: Nil)
@@ -51,10 +51,11 @@ class AllDataTypesScanSource extends SchemaRelationProvider 
{
 }
 
 case class AllDataTypesScan(
-  from: Int,
-  to: Int,
-  userSpecifiedSchema: StructType)(@transient val sqlContext: SQLContext)
-  extends TableScan {
+    from: Int,
+    to: Int,
+    userSpecifiedSchema: StructType)(@transient val sqlContext: SQLContext)
+  extends BaseRelation
+  with TableScan {
 
   override def schema = userSpecifiedSchema
 

http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 86fc654..fe86bd2 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -777,7 +777,8 @@ private[hive] case class MetastoreRelation
   val columnOrdinals = AttributeMap(attributes.zipWithIndex)
 }
 
-object HiveMetastoreTypes {
+
+private[hive] object HiveMetastoreTypes {
   protected val ddlParser = new DDLParser(HiveQl.parseSql(_))
 
   def toDataType(metastoreType: String): DataType = synchronized {

http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index a547bab..a0c91cb 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -28,7 +28,6 @@ import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.sql.hive.MetastoreRelation
 
 /**
- * :: Experimental ::
  * Create table and insert the query result into it.
  * @param database the database name of the new relation
  * @param tableName the table name of the new relation
@@ -38,7 +37,7 @@ import org.apache.spark.sql.hive.MetastoreRelation
  * @param desc the CreateTableDesc, which may contains serde, storage handler 
etc.
 
  */
-@Experimental
+private[hive]
 case class CreateTableAsSelect(
     database: String,
     tableName: String,

http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
index 07b5a84..d0510aa 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
@@ -29,11 +29,9 @@ import org.apache.spark.sql.hive.HiveShim
 import org.apache.spark.sql.SQLContext
 
 /**
- * :: DeveloperApi ::
- *
  * Implementation for "describe [extended] table".
  */
-@DeveloperApi
+private[hive]
 case class DescribeHiveTableCommand(
     table: MetastoreRelation,
     override val output: Seq[Attribute],

http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala
index 781a2e9..9636da2 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala
@@ -17,17 +17,13 @@
 
 package org.apache.spark.sql.hive.execution
 
-import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row}
 import org.apache.spark.sql.execution.RunnableCommand
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.types.StringType
 
-/**
- * :: DeveloperApi ::
- */
-@DeveloperApi
+private[hive]
 case class HiveNativeCommand(sql: String) extends RunnableCommand {
 
   override def output =

http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
index b56175f..5b3cf28 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
@@ -26,21 +26,19 @@ import org.apache.hadoop.hive.serde2.objectinspector._
 import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
 
-import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.types.{BooleanType, DataType}
 
 /**
- * :: DeveloperApi ::
  * The Hive table scan operator.  Column and partition pruning are both 
handled.
  *
  * @param requestedAttributes Attributes to be fetched from the Hive table.
  * @param relation The Hive table be be scanned.
  * @param partitionPruningPred An optional partition pruning predicate for 
partitioned table.
  */
-@DeveloperApi
+private[hive]
 case class HiveTableScan(
     requestedAttributes: Seq[Attribute],
     relation: MetastoreRelation,

http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 91af35f..ba5c8e0 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -32,7 +32,6 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.Object
 import org.apache.hadoop.hive.serde2.objectinspector._
 import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, 
JobConf}
 
-import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.expressions.Row
 import org.apache.spark.sql.execution.{UnaryNode, SparkPlan}
@@ -41,10 +40,7 @@ import org.apache.spark.sql.hive.{ ShimFileSinkDesc => 
FileSinkDesc}
 import org.apache.spark.sql.hive.HiveShim._
 import org.apache.spark.{SerializableWritable, SparkException, TaskContext}
 
-/**
- * :: DeveloperApi ::
- */
-@DeveloperApi
+private[hive]
 case class InsertIntoHiveTable(
     table: MetastoreRelation,
     partition: Map[String, Option[String]],

http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index c54fbb6..0c9aee3 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -21,15 +21,12 @@ import java.io.{BufferedReader, InputStreamReader}
 import java.io.{DataInputStream, DataOutputStream, EOFException}
 import java.util.Properties
 
+import scala.collection.JavaConversions._
+
 import org.apache.hadoop.hive.serde.serdeConstants
 import org.apache.hadoop.hive.serde2.AbstractSerDe
-import org.apache.hadoop.hive.serde2.Serializer
-import org.apache.hadoop.hive.serde2.Deserializer
 import org.apache.hadoop.hive.serde2.objectinspector._
-import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions
-import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
 
-import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.ScriptInputOutputSchema
 import org.apache.spark.sql.execution._
@@ -38,19 +35,14 @@ import org.apache.spark.sql.hive.{HiveContext, 
HiveInspectors}
 import org.apache.spark.sql.hive.HiveShim._
 import org.apache.spark.util.Utils
 
-
-/* Implicit conversions */
-import scala.collection.JavaConversions._
-
 /**
- * :: DeveloperApi ::
  * Transforms the input by forking and running the specified script.
  *
  * @param input the set of expression that should be passed to the script.
  * @param script the command that should be executed.
  * @param output the attributes that are produced by the script.
  */
-@DeveloperApi
+private[hive]
 case class ScriptTransformation(
     input: Seq[Expression],
     script: String,
@@ -175,6 +167,7 @@ case class ScriptTransformation(
 /**
  * The wrapper class of Hive input and output schema properties
  */
+private[hive]
 case class HiveScriptIOSchema (
     inputRowFormat: Seq[(String, String)],
     outputRowFormat: Seq[(String, String)],

http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 36bd3f8..63ad145 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.hive.execution
 
-import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
 import org.apache.spark.sql.catalyst.util._
@@ -30,14 +29,13 @@ import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.sql.types.StructType
 
 /**
- * :: DeveloperApi ::
  * Analyzes the given table in the current database to generate statistics, 
which will be
  * used in query optimizations.
  *
  * Right now, it only supports Hive tables and it only updates the size of a 
Hive table
  * in the Hive metastore.
  */
-@DeveloperApi
+private[hive]
 case class AnalyzeTable(tableName: String) extends RunnableCommand {
 
   override def run(sqlContext: SQLContext) = {
@@ -47,10 +45,9 @@ case class AnalyzeTable(tableName: String) extends 
RunnableCommand {
 }
 
 /**
- * :: DeveloperApi ::
  * Drops a table from the metastore and removes it if it is cached.
  */
-@DeveloperApi
+private[hive]
 case class DropTable(
     tableName: String,
     ifExists: Boolean) extends RunnableCommand {
@@ -75,10 +72,7 @@ case class DropTable(
   }
 }
 
-/**
- * :: DeveloperApi ::
- */
-@DeveloperApi
+private[hive]
 case class AddJar(path: String) extends RunnableCommand {
 
   override def run(sqlContext: SQLContext) = {
@@ -89,10 +83,7 @@ case class AddJar(path: String) extends RunnableCommand {
   }
 }
 
-/**
- * :: DeveloperApi ::
- */
-@DeveloperApi
+private[hive]
 case class AddFile(path: String) extends RunnableCommand {
 
   override def run(sqlContext: SQLContext) = {
@@ -103,10 +94,7 @@ case class AddFile(path: String) extends RunnableCommand {
   }
 }
 
-/**
- * :: DeveloperApi ::
- */
-@DeveloperApi
+private[hive]
 case class CreateMetastoreDataSource(
     tableName: String,
     userSpecifiedSchema: Option[StructType],
@@ -146,10 +134,7 @@ case class CreateMetastoreDataSource(
   }
 }
 
-/**
- * :: DeveloperApi ::
- */
-@DeveloperApi
+private[hive]
 case class CreateMetastoreDataSourceAsSelect(
     tableName: String,
     provider: String,

http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/package.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/package.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/package.scala
deleted file mode 100644
index 4989c42..0000000
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/package.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-/**
- * Physical execution operators used for running queries against data stored 
in Hive.  These
- * are not intended for use by users, but are documents so that it is easier 
to understand
- * the output of EXPLAIN queries.
- */
-package object execution

http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
new file mode 100644
index 0000000..7ff5719
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.sql.hive.test.TestHive
+import org.apache.spark.sql.parquet.ParquetTest
+import org.apache.spark.sql.{QueryTest, SQLConf}
+
+case class Cases(lower: String, UPPER: String)
+
+class HiveParquetSuite extends QueryTest with ParquetTest {
+  val sqlContext = TestHive
+
+  import sqlContext._
+
+  def run(prefix: String): Unit = {
+    test(s"$prefix: Case insensitive attribute names") {
+      withParquetTable((1 to 4).map(i => Cases(i.toString, i.toString)), 
"cases") {
+        val expected = (1 to 4).map(i => Row(i.toString))
+        checkAnswer(sql("SELECT upper FROM cases"), expected)
+        checkAnswer(sql("SELECT LOWER FROM cases"), expected)
+      }
+    }
+
+    test(s"$prefix: SELECT on Parquet table") {
+      val data = (1 to 4).map(i => (i, s"val_$i"))
+      withParquetTable(data, "t") {
+        checkAnswer(sql("SELECT * FROM t"), data.map(Row.fromTuple))
+      }
+    }
+
+    test(s"$prefix: Simple column projection + filter on Parquet table") {
+      withParquetTable((1 to 4).map(i => (i % 2 == 0, i, s"val_$i")), "t") {
+        checkAnswer(
+          sql("SELECT `_1`, `_3` FROM t WHERE `_1` = true"),
+          Seq(Row(true, "val_2"), Row(true, "val_4")))
+      }
+    }
+
+    test(s"$prefix: Converting Hive to Parquet Table via saveAsParquetFile") {
+      withTempPath { dir =>
+        sql("SELECT * FROM src").saveAsParquetFile(dir.getCanonicalPath)
+        parquetFile(dir.getCanonicalPath).registerTempTable("p")
+        withTempTable("p") {
+          checkAnswer(
+            sql("SELECT * FROM src ORDER BY key"),
+            sql("SELECT * from p ORDER BY key").collect().toSeq)
+        }
+      }
+    }
+
+    test(s"$prefix: INSERT OVERWRITE TABLE Parquet table") {
+      withParquetTable((1 to 10).map(i => (i, s"val_$i")), "t") {
+        withTempPath { file =>
+          sql("SELECT * FROM t LIMIT 
1").saveAsParquetFile(file.getCanonicalPath)
+          parquetFile(file.getCanonicalPath).registerTempTable("p")
+          withTempTable("p") {
+            // let's do three overwrites for good measure
+            sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
+            sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
+            sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
+            checkAnswer(sql("SELECT * FROM p"), sql("SELECT * FROM 
t").collect().toSeq)
+          }
+        }
+      }
+    }
+  }
+
+  withSQLConf(SQLConf.PARQUET_USE_DATA_SOURCE_API -> "true") {
+    run("Parquet data source enabled")
+  }
+
+  withSQLConf(SQLConf.PARQUET_USE_DATA_SOURCE_API -> "false") {
+    run("Parquet data source disabled")
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
new file mode 100644
index 0000000..1904f5f
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -0,0 +1,767 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.{QueryTest, SQLConf, SaveMode}
+import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD}
+import org.apache.spark.sql.hive.execution.HiveTableScan
+import org.apache.spark.sql.hive.test.TestHive._
+import org.apache.spark.sql.hive.test.TestHive.implicits._
+import org.apache.spark.sql.sources.{InsertIntoDataSource, LogicalRelation}
+import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan}
+import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.types._
+
+// The data where the partitioning key exists only in the directory structure.
+case class ParquetData(intField: Int, stringField: String)
+// The data that also includes the partitioning key
+case class ParquetDataWithKey(p: Int, intField: Int, stringField: String)
+
+case class StructContainer(intStructField :Int, stringStructField: String)
+
+case class ParquetDataWithComplexTypes(
+    intField: Int,
+    stringField: String,
+    structField: StructContainer,
+    arrayField: Seq[Int])
+
+case class ParquetDataWithKeyAndComplexTypes(
+    p: Int,
+    intField: Int,
+    stringField: String,
+    structField: StructContainer,
+    arrayField: Seq[Int])
+
+/**
+ * A suite to test the automatic conversion of metastore tables with parquet 
data to use the
+ * built in parquet support.
+ */
+class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+
+    sql(s"""
+      create external table partitioned_parquet
+      (
+        intField INT,
+        stringField STRING
+      )
+      PARTITIONED BY (p int)
+      ROW FORMAT SERDE 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+       STORED AS
+       INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+       OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+      location '${partitionedTableDir.getCanonicalPath}'
+    """)
+
+    sql(s"""
+      create external table partitioned_parquet_with_key
+      (
+        intField INT,
+        stringField STRING
+      )
+      PARTITIONED BY (p int)
+      ROW FORMAT SERDE 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+       STORED AS
+       INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+       OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+      location '${partitionedTableDirWithKey.getCanonicalPath}'
+    """)
+
+    sql(s"""
+      create external table normal_parquet
+      (
+        intField INT,
+        stringField STRING
+      )
+      ROW FORMAT SERDE 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+       STORED AS
+       INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+       OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+      location '${new File(normalTableDir, "normal").getCanonicalPath}'
+    """)
+
+    sql(s"""
+      CREATE EXTERNAL TABLE partitioned_parquet_with_complextypes
+      (
+        intField INT,
+        stringField STRING,
+        structField STRUCT<intStructField: INT, stringStructField: STRING>,
+        arrayField ARRAY<INT>
+      )
+      PARTITIONED BY (p int)
+      ROW FORMAT SERDE 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+       STORED AS
+       INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+       OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+      LOCATION '${partitionedTableDirWithComplexTypes.getCanonicalPath}'
+    """)
+
+    sql(s"""
+      CREATE EXTERNAL TABLE partitioned_parquet_with_key_and_complextypes
+      (
+        intField INT,
+        stringField STRING,
+        structField STRUCT<intStructField: INT, stringStructField: STRING>,
+        arrayField ARRAY<INT>
+      )
+      PARTITIONED BY (p int)
+      ROW FORMAT SERDE 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+       STORED AS
+       INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+       OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+      LOCATION '${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}'
+    """)
+
+    (1 to 10).foreach { p =>
+      sql(s"ALTER TABLE partitioned_parquet ADD PARTITION (p=$p)")
+    }
+
+    (1 to 10).foreach { p =>
+      sql(s"ALTER TABLE partitioned_parquet_with_key ADD PARTITION (p=$p)")
+    }
+
+    (1 to 10).foreach { p =>
+      sql(s"ALTER TABLE partitioned_parquet_with_key_and_complextypes ADD 
PARTITION (p=$p)")
+    }
+
+    (1 to 10).foreach { p =>
+      sql(s"ALTER TABLE partitioned_parquet_with_complextypes ADD PARTITION 
(p=$p)")
+    }
+
+    val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, 
"b":"str$i"}"""))
+    jsonRDD(rdd1).registerTempTable("jt")
+    val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":[$i, 
null]}"""))
+    jsonRDD(rdd2).registerTempTable("jt_array")
+
+    setConf("spark.sql.hive.convertMetastoreParquet", "true")
+  }
+
+  override def afterAll(): Unit = {
+    sql("DROP TABLE partitioned_parquet")
+    sql("DROP TABLE partitioned_parquet_with_key")
+    sql("DROP TABLE partitioned_parquet_with_complextypes")
+    sql("DROP TABLE partitioned_parquet_with_key_and_complextypes")
+    sql("DROP TABLE normal_parquet")
+    sql("DROP TABLE IF EXISTS jt")
+    sql("DROP TABLE IF EXISTS jt_array")
+    setConf("spark.sql.hive.convertMetastoreParquet", "false")
+  }
+
+  test(s"conversion is working") {
+    assert(
+      sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect {
+        case _: HiveTableScan => true
+      }.isEmpty)
+    assert(
+      sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect {
+        case _: ParquetTableScan => true
+        case _: PhysicalRDD => true
+      }.nonEmpty)
+  }
+}
+
+class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
+  val originalConf = conf.parquetUseDataSourceApi
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+
+    sql(
+      """
+        |create table test_parquet
+        |(
+        |  intField INT,
+        |  stringField STRING
+        |)
+        |ROW FORMAT SERDE 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+        |STORED AS
+        |  INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+        |  OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+      """.stripMargin)
+
+    conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    sql("DROP TABLE IF EXISTS test_parquet")
+
+    setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
+  }
+
+  test("scan an empty parquet table") {
+    checkAnswer(sql("SELECT count(*) FROM test_parquet"), Row(0))
+  }
+
+  test("scan an empty parquet table with upper case") {
+    checkAnswer(sql("SELECT count(INTFIELD) FROM TEST_parquet"), Row(0))
+  }
+
+  test("insert into an empty parquet table") {
+    sql(
+      """
+        |create table test_insert_parquet
+        |(
+        |  intField INT,
+        |  stringField STRING
+        |)
+        |ROW FORMAT SERDE 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+        |STORED AS
+        |  INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+        |  OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+      """.stripMargin)
+
+    // Insert into am empty table.
+    sql("insert into table test_insert_parquet select a, b from jt where jt.a 
> 5")
+    checkAnswer(
+      sql(s"SELECT intField, stringField FROM test_insert_parquet WHERE 
intField < 8"),
+      Row(6, "str6") :: Row(7, "str7") :: Nil
+    )
+    // Insert overwrite.
+    sql("insert overwrite table test_insert_parquet select a, b from jt where 
jt.a < 5")
+    checkAnswer(
+      sql(s"SELECT intField, stringField FROM test_insert_parquet WHERE 
intField > 2"),
+      Row(3, "str3") :: Row(4, "str4") :: Nil
+    )
+    sql("DROP TABLE IF EXISTS test_insert_parquet")
+
+    // Create it again.
+    sql(
+      """
+        |create table test_insert_parquet
+        |(
+        |  intField INT,
+        |  stringField STRING
+        |)
+        |ROW FORMAT SERDE 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+        |STORED AS
+        |  INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+        |  OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+      """.stripMargin)
+    // Insert overwrite an empty table.
+    sql("insert overwrite table test_insert_parquet select a, b from jt where 
jt.a < 5")
+    checkAnswer(
+      sql(s"SELECT intField, stringField FROM test_insert_parquet WHERE 
intField > 2"),
+      Row(3, "str3") :: Row(4, "str4") :: Nil
+    )
+    // Insert into the table.
+    sql("insert into table test_insert_parquet select a, b from jt")
+    checkAnswer(
+      sql(s"SELECT intField, stringField FROM test_insert_parquet"),
+      (1 to 10).map(i => Row(i, s"str$i")) ++ (1 to 4).map(i => Row(i, 
s"str$i"))
+    )
+    sql("DROP TABLE IF EXISTS test_insert_parquet")
+  }
+
+  test("scan a parquet table created through a CTAS statement") {
+    sql(
+      """
+        |create table test_parquet_ctas ROW FORMAT
+        |SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+        |STORED AS
+        |  INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+        |  OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+        |AS select * from jt
+      """.stripMargin)
+
+    checkAnswer(
+      sql(s"SELECT a, b FROM test_parquet_ctas WHERE a = 1"),
+      Seq(Row(1, "str1"))
+    )
+
+    table("test_parquet_ctas").queryExecution.analyzed match {
+      case LogicalRelation(p: ParquetRelation2) => // OK
+      case _ =>
+        fail(
+          s"test_parquet_ctas should be converted to 
${classOf[ParquetRelation2].getCanonicalName}")
+    }
+
+    sql("DROP TABLE IF EXISTS test_parquet_ctas")
+  }
+
+  test("MetastoreRelation in InsertIntoTable will be converted") {
+    sql(
+      """
+        |create table test_insert_parquet
+        |(
+        |  intField INT
+        |)
+        |ROW FORMAT SERDE 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+        |STORED AS
+        |  INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+        |  OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+      """.stripMargin)
+
+    val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
+    df.queryExecution.executedPlan match {
+      case ExecutedCommand(
+        InsertIntoDataSource(
+          LogicalRelation(r: ParquetRelation2), query, overwrite)) => // OK
+      case o => fail("test_insert_parquet should be converted to a " +
+        s"${classOf[ParquetRelation2].getCanonicalName} and " +
+        s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the 
SparkPlan." +
+        s"However, found a ${o.toString} ")
+    }
+
+    checkAnswer(
+      sql("SELECT intField FROM test_insert_parquet WHERE 
test_insert_parquet.intField > 5"),
+      sql("SELECT a FROM jt WHERE jt.a > 5").collect()
+    )
+
+    sql("DROP TABLE IF EXISTS test_insert_parquet")
+  }
+
+  test("MetastoreRelation in InsertIntoHiveTable will be converted") {
+    sql(
+      """
+        |create table test_insert_parquet
+        |(
+        |  int_array array<int>
+        |)
+        |ROW FORMAT SERDE 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+        |STORED AS
+        |  INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+        |  OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+      """.stripMargin)
+
+    val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM 
jt_array")
+    df.queryExecution.executedPlan match {
+      case ExecutedCommand(
+        InsertIntoDataSource(
+          LogicalRelation(r: ParquetRelation2), query, overwrite)) => // OK
+      case o => fail("test_insert_parquet should be converted to a " +
+        s"${classOf[ParquetRelation2].getCanonicalName} and " +
+        s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the 
SparkPlan." +
+        s"However, found a ${o.toString} ")
+    }
+
+    checkAnswer(
+      sql("SELECT int_array FROM test_insert_parquet"),
+      sql("SELECT a FROM jt_array").collect()
+    )
+
+    sql("DROP TABLE IF EXISTS test_insert_parquet")
+  }
+}
+
+class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {
+  val originalConf = conf.parquetUseDataSourceApi
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
+  }
+
+  test("MetastoreRelation in InsertIntoTable will not be converted") {
+    sql(
+      """
+        |create table test_insert_parquet
+        |(
+        |  intField INT
+        |)
+        |ROW FORMAT SERDE 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+        |STORED AS
+        |  INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+        |  OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+      """.stripMargin)
+
+    val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
+    df.queryExecution.executedPlan match {
+      case insert: execution.InsertIntoHiveTable => // OK
+      case o => fail(s"The SparkPlan should be 
${classOf[InsertIntoHiveTable].getCanonicalName}. " +
+        s"However, found ${o.toString}.")
+    }
+
+    checkAnswer(
+      sql("SELECT intField FROM test_insert_parquet WHERE 
test_insert_parquet.intField > 5"),
+      sql("SELECT a FROM jt WHERE jt.a > 5").collect()
+    )
+
+    sql("DROP TABLE IF EXISTS test_insert_parquet")
+  }
+
+  // TODO: enable it after the fix of SPARK-5950.
+  ignore("MetastoreRelation in InsertIntoHiveTable will not be converted") {
+    sql(
+      """
+        |create table test_insert_parquet
+        |(
+        |  int_array array<int>
+        |)
+        |ROW FORMAT SERDE 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+        |STORED AS
+        |  INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+        |  OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+      """.stripMargin)
+
+    val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM 
jt_array")
+    df.queryExecution.executedPlan match {
+      case insert: execution.InsertIntoHiveTable => // OK
+      case o => fail(s"The SparkPlan should be 
${classOf[InsertIntoHiveTable].getCanonicalName}. " +
+        s"However, found ${o.toString}.")
+    }
+
+    checkAnswer(
+      sql("SELECT int_array FROM test_insert_parquet"),
+      sql("SELECT a FROM jt_array").collect()
+    )
+
+    sql("DROP TABLE IF EXISTS test_insert_parquet")
+  }
+}
+
+/**
+ * A suite of tests for the Parquet support through the data sources API.
+ */
+class ParquetSourceSuiteBase extends ParquetPartitioningTest {
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+
+    sql( s"""
+      create temporary table partitioned_parquet
+      USING org.apache.spark.sql.parquet
+      OPTIONS (
+        path '${partitionedTableDir.getCanonicalPath}'
+      )
+    """)
+
+    sql( s"""
+      create temporary table partitioned_parquet_with_key
+      USING org.apache.spark.sql.parquet
+      OPTIONS (
+        path '${partitionedTableDirWithKey.getCanonicalPath}'
+      )
+    """)
+
+    sql( s"""
+      create temporary table normal_parquet
+      USING org.apache.spark.sql.parquet
+      OPTIONS (
+        path '${new File(partitionedTableDir, "p=1").getCanonicalPath}'
+      )
+    """)
+
+    sql( s"""
+      CREATE TEMPORARY TABLE partitioned_parquet_with_key_and_complextypes
+      USING org.apache.spark.sql.parquet
+      OPTIONS (
+        path '${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}'
+      )
+    """)
+
+    sql( s"""
+      CREATE TEMPORARY TABLE partitioned_parquet_with_complextypes
+      USING org.apache.spark.sql.parquet
+      OPTIONS (
+        path '${partitionedTableDirWithComplexTypes.getCanonicalPath}'
+      )
+    """)
+  }
+
+  test("SPARK-6016 make sure to use the latest footers") {
+    sql("drop table if exists spark_6016_fix")
+
+    // Create a DataFrame with two partitions. So, the created table will have 
two parquet files.
+    val df1 = jsonRDD(sparkContext.parallelize((1 to 10).map(i => 
s"""{"a":$i}"""), 2))
+    df1.saveAsTable("spark_6016_fix", "parquet", SaveMode.Overwrite)
+    checkAnswer(
+      sql("select * from spark_6016_fix"),
+      (1 to 10).map(i => Row(i))
+    )
+
+    // Create a DataFrame with four partitions. So, the created table will 
have four parquet files.
+    val df2 = jsonRDD(sparkContext.parallelize((1 to 10).map(i => 
s"""{"b":$i}"""), 4))
+    df2.saveAsTable("spark_6016_fix", "parquet", SaveMode.Overwrite)
+    // For the bug of SPARK-6016, we are caching two outdated footers for df1. 
Then,
+    // since the new table has four parquet files, we are trying to read new 
footers from two files
+    // and then merge metadata in footers of these four (two outdated ones and 
two latest one),
+    // which will cause an error.
+    checkAnswer(
+      sql("select * from spark_6016_fix"),
+      (1 to 10).map(i => Row(i))
+    )
+
+    sql("drop table spark_6016_fix")
+  }
+}
+
+class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase {
+  val originalConf = conf.parquetUseDataSourceApi
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
+  }
+
+  test("values in arrays and maps stored in parquet are always nullable") {
+    val df = createDataFrame(Tuple2(Map(2 -> 3), Seq(4, 5, 6)) :: 
Nil).toDF("m", "a")
+    val mapType1 = MapType(IntegerType, IntegerType, valueContainsNull = false)
+    val arrayType1 = ArrayType(IntegerType, containsNull = false)
+    val expectedSchema1 =
+      StructType(
+        StructField("m", mapType1, nullable = true) ::
+        StructField("a", arrayType1, nullable = true) :: Nil)
+    assert(df.schema === expectedSchema1)
+
+    df.saveAsTable("alwaysNullable", "parquet")
+
+    val mapType2 = MapType(IntegerType, IntegerType, valueContainsNull = true)
+    val arrayType2 = ArrayType(IntegerType, containsNull = true)
+    val expectedSchema2 =
+      StructType(
+        StructField("m", mapType2, nullable = true) ::
+          StructField("a", arrayType2, nullable = true) :: Nil)
+
+    assert(table("alwaysNullable").schema === expectedSchema2)
+
+    checkAnswer(
+      sql("SELECT m, a FROM alwaysNullable"),
+      Row(Map(2 -> 3), Seq(4, 5, 6)))
+
+    sql("DROP TABLE alwaysNullable")
+  }
+}
+
+class ParquetDataSourceOffSourceSuite extends ParquetSourceSuiteBase {
+  val originalConf = conf.parquetUseDataSourceApi
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
+  }
+}
+
+/**
+ * A collection of tests for parquet data with various forms of partitioning.
+ */
+abstract class ParquetPartitioningTest extends QueryTest with 
BeforeAndAfterAll {
+  var partitionedTableDir: File = null
+  var normalTableDir: File = null
+  var partitionedTableDirWithKey: File = null
+  var partitionedTableDirWithComplexTypes: File = null
+  var partitionedTableDirWithKeyAndComplexTypes: File = null
+
+  override def beforeAll(): Unit = {
+    partitionedTableDir = File.createTempFile("parquettests", "sparksql")
+    partitionedTableDir.delete()
+    partitionedTableDir.mkdir()
+
+    normalTableDir = File.createTempFile("parquettests", "sparksql")
+    normalTableDir.delete()
+    normalTableDir.mkdir()
+
+    (1 to 10).foreach { p =>
+      val partDir = new File(partitionedTableDir, s"p=$p")
+      sparkContext.makeRDD(1 to 10)
+        .map(i => ParquetData(i, s"part-$p"))
+        .toDF()
+        .saveAsParquetFile(partDir.getCanonicalPath)
+    }
+
+    sparkContext
+      .makeRDD(1 to 10)
+      .map(i => ParquetData(i, s"part-1"))
+      .toDF()
+      .saveAsParquetFile(new File(normalTableDir, "normal").getCanonicalPath)
+
+    partitionedTableDirWithKey = File.createTempFile("parquettests", 
"sparksql")
+    partitionedTableDirWithKey.delete()
+    partitionedTableDirWithKey.mkdir()
+
+    (1 to 10).foreach { p =>
+      val partDir = new File(partitionedTableDirWithKey, s"p=$p")
+      sparkContext.makeRDD(1 to 10)
+        .map(i => ParquetDataWithKey(p, i, s"part-$p"))
+        .toDF()
+        .saveAsParquetFile(partDir.getCanonicalPath)
+    }
+
+    partitionedTableDirWithKeyAndComplexTypes = 
File.createTempFile("parquettests", "sparksql")
+    partitionedTableDirWithKeyAndComplexTypes.delete()
+    partitionedTableDirWithKeyAndComplexTypes.mkdir()
+
+    (1 to 10).foreach { p =>
+      val partDir = new File(partitionedTableDirWithKeyAndComplexTypes, 
s"p=$p")
+      sparkContext.makeRDD(1 to 10).map { i =>
+        ParquetDataWithKeyAndComplexTypes(
+          p, i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i)
+      }.toDF().saveAsParquetFile(partDir.getCanonicalPath)
+    }
+
+    partitionedTableDirWithComplexTypes = File.createTempFile("parquettests", 
"sparksql")
+    partitionedTableDirWithComplexTypes.delete()
+    partitionedTableDirWithComplexTypes.mkdir()
+
+    (1 to 10).foreach { p =>
+      val partDir = new File(partitionedTableDirWithComplexTypes, s"p=$p")
+      sparkContext.makeRDD(1 to 10).map { i =>
+        ParquetDataWithComplexTypes(i, s"part-$p", StructContainer(i, 
f"${i}_string"), 1 to i)
+      }.toDF().saveAsParquetFile(partDir.getCanonicalPath)
+    }
+  }
+
+  override protected def afterAll(): Unit = {
+    partitionedTableDir.delete()
+    normalTableDir.delete()
+    partitionedTableDirWithKey.delete()
+    partitionedTableDirWithComplexTypes.delete()
+    partitionedTableDirWithKeyAndComplexTypes.delete()
+  }
+
+  Seq(
+    "partitioned_parquet",
+    "partitioned_parquet_with_key",
+    "partitioned_parquet_with_complextypes",
+    "partitioned_parquet_with_key_and_complextypes").foreach { table =>
+
+    test(s"ordering of the partitioning columns $table") {
+      checkAnswer(
+        sql(s"SELECT p, stringField FROM $table WHERE p = 1"),
+        Seq.fill(10)(Row(1, "part-1"))
+      )
+
+      checkAnswer(
+        sql(s"SELECT stringField, p FROM $table WHERE p = 1"),
+        Seq.fill(10)(Row("part-1", 1))
+      )
+    }
+
+    test(s"project the partitioning column $table") {
+      checkAnswer(
+        sql(s"SELECT p, count(*) FROM $table group by p"),
+        Row(1, 10) ::
+          Row(2, 10) ::
+          Row(3, 10) ::
+          Row(4, 10) ::
+          Row(5, 10) ::
+          Row(6, 10) ::
+          Row(7, 10) ::
+          Row(8, 10) ::
+          Row(9, 10) ::
+          Row(10, 10) :: Nil
+      )
+    }
+
+    test(s"project partitioning and non-partitioning columns $table") {
+      checkAnswer(
+        sql(s"SELECT stringField, p, count(intField) FROM $table GROUP BY p, 
stringField"),
+        Row("part-1", 1, 10) ::
+          Row("part-2", 2, 10) ::
+          Row("part-3", 3, 10) ::
+          Row("part-4", 4, 10) ::
+          Row("part-5", 5, 10) ::
+          Row("part-6", 6, 10) ::
+          Row("part-7", 7, 10) ::
+          Row("part-8", 8, 10) ::
+          Row("part-9", 9, 10) ::
+          Row("part-10", 10, 10) :: Nil
+      )
+    }
+
+    test(s"simple count $table") {
+      checkAnswer(
+        sql(s"SELECT COUNT(*) FROM $table"),
+        Row(100))
+    }
+
+    test(s"pruned count $table") {
+      checkAnswer(
+        sql(s"SELECT COUNT(*) FROM $table WHERE p = 1"),
+        Row(10))
+    }
+
+    test(s"non-existent partition $table") {
+      checkAnswer(
+        sql(s"SELECT COUNT(*) FROM $table WHERE p = 1000"),
+        Row(0))
+    }
+
+    test(s"multi-partition pruned count $table") {
+      checkAnswer(
+        sql(s"SELECT COUNT(*) FROM $table WHERE p IN (1,2,3)"),
+        Row(30))
+    }
+
+    test(s"non-partition predicates $table") {
+      checkAnswer(
+        sql(s"SELECT COUNT(*) FROM $table WHERE intField IN (1,2,3)"),
+        Row(30))
+    }
+
+    test(s"sum $table") {
+      checkAnswer(
+        sql(s"SELECT SUM(intField) FROM $table WHERE intField IN (1,2,3) AND p 
= 1"),
+        Row(1 + 2 + 3))
+    }
+
+    test(s"hive udfs $table") {
+      checkAnswer(
+        sql(s"SELECT concat(stringField, stringField) FROM $table"),
+        sql(s"SELECT stringField FROM $table").map {
+          case Row(s: String) => Row(s + s)
+        }.collect().toSeq)
+    }
+  }
+
+  Seq(
+    "partitioned_parquet_with_key_and_complextypes",
+    "partitioned_parquet_with_complextypes").foreach { table =>
+
+    test(s"SPARK-5775 read struct from $table") {
+      checkAnswer(
+        sql(s"SELECT p, structField.intStructField, 
structField.stringStructField FROM $table WHERE p = 1"),
+        (1 to 10).map(i => Row(1, i, f"${i}_string")))
+    }
+
+    // Re-enable this after SPARK-5508 is fixed
+    ignore(s"SPARK-5775 read array from $table") {
+      checkAnswer(
+        sql(s"SELECT arrayField, p FROM $table WHERE p = 1"),
+        (1 to 10).map(i => Row(1 to i, 1)))
+    }
+  }
+
+
+  test("non-part select(*)") {
+    checkAnswer(
+      sql("SELECT COUNT(*) FROM normal_parquet"),
+      Row(10))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/54d19689/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
deleted file mode 100644
index e89b448..0000000
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.parquet
-
-import org.apache.spark.sql.{SQLConf, QueryTest}
-import org.apache.spark.sql.catalyst.expressions.Row
-import org.apache.spark.sql.hive.test.TestHive
-
-case class Cases(lower: String, UPPER: String)
-
-class HiveParquetSuite extends QueryTest with ParquetTest {
-  val sqlContext = TestHive
-
-  import sqlContext._
-
-  def run(prefix: String): Unit = {
-    test(s"$prefix: Case insensitive attribute names") {
-      withParquetTable((1 to 4).map(i => Cases(i.toString, i.toString)), 
"cases") {
-        val expected = (1 to 4).map(i => Row(i.toString))
-        checkAnswer(sql("SELECT upper FROM cases"), expected)
-        checkAnswer(sql("SELECT LOWER FROM cases"), expected)
-      }
-    }
-
-    test(s"$prefix: SELECT on Parquet table") {
-      val data = (1 to 4).map(i => (i, s"val_$i"))
-      withParquetTable(data, "t") {
-        checkAnswer(sql("SELECT * FROM t"), data.map(Row.fromTuple))
-      }
-    }
-
-    test(s"$prefix: Simple column projection + filter on Parquet table") {
-      withParquetTable((1 to 4).map(i => (i % 2 == 0, i, s"val_$i")), "t") {
-        checkAnswer(
-          sql("SELECT `_1`, `_3` FROM t WHERE `_1` = true"),
-          Seq(Row(true, "val_2"), Row(true, "val_4")))
-      }
-    }
-
-    test(s"$prefix: Converting Hive to Parquet Table via saveAsParquetFile") {
-      withTempPath { dir =>
-        sql("SELECT * FROM src").saveAsParquetFile(dir.getCanonicalPath)
-        parquetFile(dir.getCanonicalPath).registerTempTable("p")
-        withTempTable("p") {
-          checkAnswer(
-            sql("SELECT * FROM src ORDER BY key"),
-            sql("SELECT * from p ORDER BY key").collect().toSeq)
-        }
-      }
-    }
-
-    test(s"$prefix: INSERT OVERWRITE TABLE Parquet table") {
-      withParquetTable((1 to 10).map(i => (i, s"val_$i")), "t") {
-        withTempPath { file =>
-          sql("SELECT * FROM t LIMIT 
1").saveAsParquetFile(file.getCanonicalPath)
-          parquetFile(file.getCanonicalPath).registerTempTable("p")
-          withTempTable("p") {
-            // let's do three overwrites for good measure
-            sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
-            sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
-            sql("INSERT OVERWRITE TABLE p SELECT * FROM t")
-            checkAnswer(sql("SELECT * FROM p"), sql("SELECT * FROM 
t").collect().toSeq)
-          }
-        }
-      }
-    }
-  }
-
-  withSQLConf(SQLConf.PARQUET_USE_DATA_SOURCE_API -> "true") {
-    run("Parquet data source enabled")
-  }
-
-  withSQLConf(SQLConf.PARQUET_USE_DATA_SOURCE_API -> "false") {
-    run("Parquet data source disabled")
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to