Repository: spark
Updated Branches:
  refs/heads/branch-1.3 066301c65 -> 78f7edb85


http://git-wip-us.apache.org/repos/asf/spark/blob/78f7edb8/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index ddc7b18..87b380f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -22,26 +22,24 @@ import java.sql.Timestamp
 
 import scala.collection.JavaConversions._
 import scala.language.implicitConversions
-import scala.reflect.runtime.universe.TypeTag
 
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.ql.Driver
 import org.apache.hadoop.hive.ql.metadata.Table
-import org.apache.hadoop.hive.ql.processors._
 import org.apache.hadoop.hive.ql.parse.VariableSubstitution
+import org.apache.hadoop.hive.ql.processors._
 import org.apache.hadoop.hive.ql.session.SessionState
 import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
 
 import org.apache.spark.SparkContext
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateSubQueries, 
OverrideCatalog, OverrideFunctionRegistry}
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUdfs, 
SetCommand, QueryExecutionException}
-import org.apache.spark.sql.hive.execution.{HiveNativeCommand, 
DescribeHiveTableCommand}
-import org.apache.spark.sql.sources.{CreateTableUsing, DataSourceStrategy}
+import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUdfs, 
QueryExecutionException, SetCommand}
+import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, 
HiveNativeCommand}
+import org.apache.spark.sql.sources.DataSourceStrategy
 import org.apache.spark.sql.types._
 
 /**
@@ -244,6 +242,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
   override protected[sql] lazy val analyzer =
     new Analyzer(catalog, functionRegistry, caseSensitive = false) {
       override val extendedRules =
+        catalog.ParquetConversions ::
         catalog.CreateTables ::
         catalog.PreInsertionCasts ::
         ExtractPythonUdfs ::

http://git-wip-us.apache.org/repos/asf/spark/blob/78f7edb8/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index eb1ee54..6d794d0 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -20,25 +20,25 @@ package org.apache.spark.sql.hive
 import java.io.IOException
 import java.util.{List => JList}
 
-import com.google.common.cache.{LoadingCache, CacheLoader, CacheBuilder}
-
-import org.apache.hadoop.util.ReflectionUtils
-import org.apache.hadoop.hive.metastore.{Warehouse, TableType}
-import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => 
TPartition, FieldSchema}
+import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
+import org.apache.hadoop.hive.metastore.api.{FieldSchema, Partition => 
TPartition, Table => TTable}
+import org.apache.hadoop.hive.metastore.{TableType, Warehouse}
 import org.apache.hadoop.hive.ql.metadata._
 import org.apache.hadoop.hive.ql.plan.CreateTableDesc
 import org.apache.hadoop.hive.serde.serdeConstants
-import org.apache.hadoop.hive.serde2.{Deserializer, SerDeException}
 import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
+import org.apache.hadoop.hive.serde2.{Deserializer, SerDeException}
+import org.apache.hadoop.util.ReflectionUtils
 
 import org.apache.spark.Logging
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.analysis.{Catalog, OverrideCatalog}
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.parquet.ParquetRelation2
+import org.apache.spark.sql.parquet.{ParquetRelation2, Partition => 
ParquetPartition, PartitionSpec}
 import org.apache.spark.sql.sources.{DDLParser, LogicalRelation, 
ResolvedDataSource}
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
@@ -101,16 +101,10 @@ private[hive] class HiveMetastoreCatalog(hive: 
HiveContext) extends Catalog with
 
   val caseSensitive: Boolean = false
 
-  /** *
-    * Creates a data source table (a table created with USING clause) in 
Hive's metastore.
-    * Returns true when the table has been created. Otherwise, false.
-    * @param tableName
-    * @param userSpecifiedSchema
-    * @param provider
-    * @param options
-    * @param isExternal
-    * @return
-    */
+  /**
+   * Creates a data source table (a table created with USING clause) in Hive's 
metastore.
+   * Returns true when the table has been created. Otherwise, false.
+   */
   def createDataSourceTable(
       tableName: String,
       userSpecifiedSchema: Option[StructType],
@@ -141,7 +135,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) 
extends Catalog with
   }
 
   def hiveDefaultTableFilePath(tableName: String): String = {
-    val currentDatabase = 
client.getDatabase(hive.sessionState.getCurrentDatabase())
+    val currentDatabase = 
client.getDatabase(hive.sessionState.getCurrentDatabase)
     hiveWarehouse.getTablePath(currentDatabase, tableName).toString
   }
 
@@ -176,25 +170,41 @@ private[hive] class HiveMetastoreCatalog(hive: 
HiveContext) extends Catalog with
           Nil
         }
 
-      val relation = MetastoreRelation(
-        databaseName, tblName, alias)(
-          table.getTTable, partitions.map(part => part.getTPartition))(hive)
-
-      if (hive.convertMetastoreParquet &&
-          hive.conf.parquetUseDataSourceApi &&
-          
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet")) {
-        val metastoreSchema = StructType.fromAttributes(relation.output)
-        val paths = if (relation.hiveQlTable.isPartitioned) {
-          relation.hiveQlPartitions.map(p => p.getLocation)
-        } else {
-          Seq(relation.hiveQlTable.getDataLocation.toString)
-        }
+      MetastoreRelation(databaseName, tblName, alias)(
+        table.getTTable, partitions.map(part => part.getTPartition))(hive)
+    }
+  }
 
-        LogicalRelation(ParquetRelation2(
-          paths, Map(ParquetRelation2.METASTORE_SCHEMA -> 
metastoreSchema.json))(hive))
-      } else {
-        relation
+  private def convertToParquetRelation(metastoreRelation: MetastoreRelation): 
LogicalRelation = {
+    val metastoreSchema = StructType.fromAttributes(metastoreRelation.output)
+
+    // NOTE: Instead of passing Metastore schema directly to 
`ParquetRelation2`, we have to
+    // serialize the Metastore schema to JSON and pass it as a data source 
option because of the
+    // evil case insensitivity issue, which is reconciled within 
`ParquetRelation2`.
+    if (metastoreRelation.hiveQlTable.isPartitioned) {
+      val partitionSchema = 
StructType.fromAttributes(metastoreRelation.partitionKeys)
+      val partitionColumnDataTypes = partitionSchema.map(_.dataType)
+      val partitions = metastoreRelation.hiveQlPartitions.map { p =>
+        val location = p.getLocation
+        val values = Row.fromSeq(p.getValues.zip(partitionColumnDataTypes).map 
{
+          case (rawValue, dataType) => Cast(Literal(rawValue), 
dataType).eval(null)
+        })
+        ParquetPartition(values, location)
       }
+      val partitionSpec = PartitionSpec(partitionSchema, partitions)
+      val paths = partitions.map(_.path)
+      LogicalRelation(
+        ParquetRelation2(
+          paths,
+          Map(ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json),
+          None,
+          Some(partitionSpec))(hive))
+    } else {
+      val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString)
+      LogicalRelation(
+        ParquetRelation2(
+          paths,
+          Map(ParquetRelation2.METASTORE_SCHEMA -> 
metastoreSchema.json))(hive))
     }
   }
 
@@ -261,9 +271,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) 
extends Catalog with
         logInfo(s"Default to LazySimpleSerDe for table $dbName.$tblName")
         tbl.setSerializationLib(classOf[LazySimpleSerDe].getName())
 
-        import org.apache.hadoop.mapred.TextInputFormat
         import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
         import org.apache.hadoop.io.Text
+        import org.apache.hadoop.mapred.TextInputFormat
 
         tbl.setInputFormatClass(classOf[TextInputFormat])
         tbl.setOutputFormatClass(classOf[HiveIgnoreKeyTextOutputFormat[Text, 
Text]])
@@ -386,12 +396,55 @@ private[hive] class HiveMetastoreCatalog(hive: 
HiveContext) extends Catalog with
   }
 
   /**
+   * When scanning or writing to non-partitioned Metastore Parquet tables, 
convert them to Parquet
+   * data source relations for better performance.
+   *
+   * This rule can be considered as [[HiveStrategies.ParquetConversion]] done 
right.
+   */
+  object ParquetConversions extends Rule[LogicalPlan] {
+    override def apply(plan: LogicalPlan): LogicalPlan = {
+      // Collects all `MetastoreRelation`s which should be replaced
+      val toBeReplaced = plan.collect {
+        // Write path
+        case InsertIntoTable(relation: MetastoreRelation, _, _, _)
+            // Inserting into partitioned table is not supported in Parquet 
data source (yet).
+            if !relation.hiveQlTable.isPartitioned &&
+              hive.convertMetastoreParquet &&
+              hive.conf.parquetUseDataSourceApi &&
+              
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
+          relation
+
+        // Read path
+        case p @ PhysicalOperation(_, _, relation: MetastoreRelation)
+            if hive.convertMetastoreParquet &&
+              hive.conf.parquetUseDataSourceApi &&
+              
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
+          relation
+      }
+
+      // Replaces all `MetastoreRelation`s with corresponding 
`ParquetRelation2`s, and fixes
+      // attribute IDs referenced in other nodes.
+      toBeReplaced.distinct.foldLeft(plan) { (lastPlan, relation) =>
+        val parquetRelation = convertToParquetRelation(relation)
+        val attributedRewrites = 
AttributeMap(relation.output.zip(parquetRelation.output))
+
+        lastPlan.transformUp {
+          case r: MetastoreRelation if r == relation => parquetRelation
+          case other => other.transformExpressions {
+            case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, 
a)
+          }
+        }
+      }
+    }
+  }
+
+  /**
    * Creates any tables required for query execution.
    * For example, because of a CREATE TABLE X AS statement.
    */
   object CreateTables extends Rule[LogicalPlan] {
     import org.apache.hadoop.hive.ql.Context
-    import org.apache.hadoop.hive.ql.parse.{QB, ASTNode, SemanticAnalyzer}
+    import org.apache.hadoop.hive.ql.parse.{ASTNode, QB, SemanticAnalyzer}
 
     def apply(plan: LogicalPlan): LogicalPlan = plan transform {
       // Wait until children are resolved.

http://git-wip-us.apache.org/repos/asf/spark/blob/78f7edb8/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index cb138be..965d159 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -139,15 +139,19 @@ private[hive] trait HiveStrategies {
 
             val partitionLocations = partitions.map(_.getLocation)
 
-            hiveContext
-              .parquetFile(partitionLocations.head, partitionLocations.tail: 
_*)
-              .addPartitioningAttributes(relation.partitionKeys)
-              .lowerCase
-              .where(unresolvedOtherPredicates)
-              .select(unresolvedProjection: _*)
-              .queryExecution
-              .executedPlan
-              .fakeOutput(projectList.map(_.toAttribute)) :: Nil
+            if (partitionLocations.isEmpty) {
+              PhysicalRDD(plan.output, sparkContext.emptyRDD[Row]) :: Nil
+            } else {
+              hiveContext
+                .parquetFile(partitionLocations.head, partitionLocations.tail: 
_*)
+                .addPartitioningAttributes(relation.partitionKeys)
+                .lowerCase
+                .where(unresolvedOtherPredicates)
+                .select(unresolvedProjection: _*)
+                .queryExecution
+                .executedPlan
+                .fakeOutput(projectList.map(_.toAttribute)) :: Nil
+            }
           } else {
             hiveContext
               .parquetFile(relation.hiveQlTable.getDataLocation.toString)

http://git-wip-us.apache.org/repos/asf/spark/blob/78f7edb8/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
index e246cbb..2acf1a7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
@@ -40,7 +40,7 @@ case class ParquetDataWithKey(p: Int, intField: Int, 
stringField: String)
  * A suite to test the automatic conversion of metastore tables with parquet 
data to use the
  * built in parquet support.
  */
-class ParquetMetastoreSuite extends ParquetPartitioningTest {
+class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
   override def beforeAll(): Unit = {
     super.beforeAll()
 
@@ -97,6 +97,9 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
   }
 
   override def afterAll(): Unit = {
+    sql("DROP TABLE partitioned_parquet")
+    sql("DROP TABLE partitioned_parquet_with_key")
+    sql("DROP TABLE normal_parquet")
     setConf("spark.sql.hive.convertMetastoreParquet", "false")
   }
 
@@ -113,10 +116,38 @@ class ParquetMetastoreSuite extends 
ParquetPartitioningTest {
   }
 }
 
+class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
+  val originalConf = conf.parquetUseDataSourceApi
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
+  }
+}
+
+class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {
+  val originalConf = conf.parquetUseDataSourceApi
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
+  }
+}
+
 /**
  * A suite of tests for the Parquet support through the data sources API.
  */
-class ParquetSourceSuite extends ParquetPartitioningTest {
+class ParquetSourceSuiteBase extends ParquetPartitioningTest {
   override def beforeAll(): Unit = {
     super.beforeAll()
 
@@ -146,6 +177,34 @@ class ParquetSourceSuite extends ParquetPartitioningTest {
   }
 }
 
+class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase {
+  val originalConf = conf.parquetUseDataSourceApi
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
+  }
+}
+
+class ParquetDataSourceOffSourceSuite extends ParquetSourceSuiteBase {
+  val originalConf = conf.parquetUseDataSourceApi
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
+  }
+}
+
 /**
  * A collection of tests for parquet data with various forms of partitioning.
  */
@@ -191,107 +250,99 @@ abstract class ParquetPartitioningTest extends QueryTest 
with BeforeAndAfterAll
     }
   }
 
-  def run(prefix: String): Unit = {
-    Seq("partitioned_parquet", "partitioned_parquet_with_key").foreach { table 
=>
-      test(s"$prefix: ordering of the partitioning columns $table") {
-        checkAnswer(
-          sql(s"SELECT p, stringField FROM $table WHERE p = 1"),
-          Seq.fill(10)(Row(1, "part-1"))
-        )
-
-        checkAnswer(
-          sql(s"SELECT stringField, p FROM $table WHERE p = 1"),
-          Seq.fill(10)(Row("part-1", 1))
-        )
-      }
-
-      test(s"$prefix: project the partitioning column $table") {
-        checkAnswer(
-          sql(s"SELECT p, count(*) FROM $table group by p"),
-          Row(1, 10) ::
-            Row(2, 10) ::
-            Row(3, 10) ::
-            Row(4, 10) ::
-            Row(5, 10) ::
-            Row(6, 10) ::
-            Row(7, 10) ::
-            Row(8, 10) ::
-            Row(9, 10) ::
-            Row(10, 10) :: Nil
-        )
-      }
-
-      test(s"$prefix: project partitioning and non-partitioning columns 
$table") {
-        checkAnswer(
-          sql(s"SELECT stringField, p, count(intField) FROM $table GROUP BY p, 
stringField"),
-          Row("part-1", 1, 10) ::
-            Row("part-2", 2, 10) ::
-            Row("part-3", 3, 10) ::
-            Row("part-4", 4, 10) ::
-            Row("part-5", 5, 10) ::
-            Row("part-6", 6, 10) ::
-            Row("part-7", 7, 10) ::
-            Row("part-8", 8, 10) ::
-            Row("part-9", 9, 10) ::
-            Row("part-10", 10, 10) :: Nil
-        )
-      }
-
-      test(s"$prefix: simple count $table") {
-        checkAnswer(
-          sql(s"SELECT COUNT(*) FROM $table"),
-          Row(100))
-      }
-
-      test(s"$prefix: pruned count $table") {
-        checkAnswer(
-          sql(s"SELECT COUNT(*) FROM $table WHERE p = 1"),
-          Row(10))
-      }
-
-      test(s"$prefix: non-existent partition $table") {
-        checkAnswer(
-          sql(s"SELECT COUNT(*) FROM $table WHERE p = 1000"),
-          Row(0))
-      }
-
-      test(s"$prefix: multi-partition pruned count $table") {
-        checkAnswer(
-          sql(s"SELECT COUNT(*) FROM $table WHERE p IN (1,2,3)"),
-          Row(30))
-      }
-
-      test(s"$prefix: non-partition predicates $table") {
-        checkAnswer(
-          sql(s"SELECT COUNT(*) FROM $table WHERE intField IN (1,2,3)"),
-          Row(30))
-      }
-
-      test(s"$prefix: sum $table") {
-        checkAnswer(
-          sql(s"SELECT SUM(intField) FROM $table WHERE intField IN (1,2,3) AND 
p = 1"),
-          Row(1 + 2 + 3))
-      }
-
-      test(s"$prefix: hive udfs $table") {
-        checkAnswer(
-          sql(s"SELECT concat(stringField, stringField) FROM $table"),
-          sql(s"SELECT stringField FROM $table").map {
-            case Row(s: String) => Row(s + s)
-          }.collect().toSeq)
-      }
+  Seq("partitioned_parquet", "partitioned_parquet_with_key").foreach { table =>
+    test(s"ordering of the partitioning columns $table") {
+      checkAnswer(
+        sql(s"SELECT p, stringField FROM $table WHERE p = 1"),
+        Seq.fill(10)(Row(1, "part-1"))
+      )
+
+      checkAnswer(
+        sql(s"SELECT stringField, p FROM $table WHERE p = 1"),
+        Seq.fill(10)(Row("part-1", 1))
+      )
+    }
+
+    test(s"project the partitioning column $table") {
+      checkAnswer(
+        sql(s"SELECT p, count(*) FROM $table group by p"),
+        Row(1, 10) ::
+          Row(2, 10) ::
+          Row(3, 10) ::
+          Row(4, 10) ::
+          Row(5, 10) ::
+          Row(6, 10) ::
+          Row(7, 10) ::
+          Row(8, 10) ::
+          Row(9, 10) ::
+          Row(10, 10) :: Nil
+      )
+    }
+
+    test(s"project partitioning and non-partitioning columns $table") {
+      checkAnswer(
+        sql(s"SELECT stringField, p, count(intField) FROM $table GROUP BY p, 
stringField"),
+        Row("part-1", 1, 10) ::
+          Row("part-2", 2, 10) ::
+          Row("part-3", 3, 10) ::
+          Row("part-4", 4, 10) ::
+          Row("part-5", 5, 10) ::
+          Row("part-6", 6, 10) ::
+          Row("part-7", 7, 10) ::
+          Row("part-8", 8, 10) ::
+          Row("part-9", 9, 10) ::
+          Row("part-10", 10, 10) :: Nil
+      )
+    }
+
+    test(s"simple count $table") {
+      checkAnswer(
+        sql(s"SELECT COUNT(*) FROM $table"),
+        Row(100))
     }
 
-    test(s"$prefix: $prefix: non-part select(*)") {
+    test(s"pruned count $table") {
       checkAnswer(
-        sql("SELECT COUNT(*) FROM normal_parquet"),
+        sql(s"SELECT COUNT(*) FROM $table WHERE p = 1"),
         Row(10))
     }
-  }
 
-  setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
-  run("Parquet data source enabled")
+    test(s"non-existent partition $table") {
+      checkAnswer(
+        sql(s"SELECT COUNT(*) FROM $table WHERE p = 1000"),
+        Row(0))
+    }
+
+    test(s"multi-partition pruned count $table") {
+      checkAnswer(
+        sql(s"SELECT COUNT(*) FROM $table WHERE p IN (1,2,3)"),
+        Row(30))
+    }
+
+    test(s"non-partition predicates $table") {
+      checkAnswer(
+        sql(s"SELECT COUNT(*) FROM $table WHERE intField IN (1,2,3)"),
+        Row(30))
+    }
 
-  setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
-  run("Parquet data source disabled")
+    test(s"sum $table") {
+      checkAnswer(
+        sql(s"SELECT SUM(intField) FROM $table WHERE intField IN (1,2,3) AND p 
= 1"),
+        Row(1 + 2 + 3))
+    }
+
+    test(s"hive udfs $table") {
+      checkAnswer(
+        sql(s"SELECT concat(stringField, stringField) FROM $table"),
+        sql(s"SELECT stringField FROM $table").map {
+          case Row(s: String) => Row(s + s)
+        }.collect().toSeq)
+    }
+  }
+
+  test("non-part select(*)") {
+    checkAnswer(
+      sql("SELECT COUNT(*) FROM normal_parquet"),
+      Row(10))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to