Repository: spark
Updated Branches:
  refs/heads/master 8ead999fd -> 6463e0b9e


[SPARK-4912][SQL] Persistent tables for the Spark SQL data sources api

With changes in this PR, users can persist metadata of tables created based on 
the data source API in metastore through DDLs.

Author: Yin Huai <yh...@databricks.com>
Author: Michael Armbrust <mich...@databricks.com>

Closes #3960 from yhuai/persistantTablesWithSchema2 and squashes the following 
commits:

069c235 [Yin Huai] Make exception messages user friendly.
c07cbc6 [Yin Huai] Get the location of test file in a correct way.
4456e98 [Yin Huai] Test data.
5315dfc [Yin Huai] rxin's comments.
7fc4b56 [Yin Huai] Add DDLStrategy and HiveDDLStrategy to plan DDLs based on 
the data source API.
aeaf4b3 [Yin Huai] Add comments.
06f9b0c [Yin Huai] Revert unnecessary changes.
feb88aa [Yin Huai] Merge remote-tracking branch 'apache/master' into 
persistantTablesWithSchema2
172db80 [Yin Huai] Fix unit test.
49bf1ac [Yin Huai] Unit tests.
8f8f1a1 [Yin Huai] [SPARK-4574][SQL] Adding support for defining schema in 
foreign DDL commands. #3431
f47fda1 [Yin Huai] Unit tests.
2b59723 [Michael Armbrust] Set external when creating tables
c00bb1b [Michael Armbrust] Don't use reflection to read options
1ea6e7b [Michael Armbrust] Don't fail when trying to uncache a table that 
doesn't exist
6edc710 [Michael Armbrust] Add tests.
d7da491 [Michael Armbrust] First draft of persistent tables.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6463e0b9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6463e0b9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6463e0b9

Branch: refs/heads/master
Commit: 6463e0b9e8067cce70602c5c9006a2546856a9d6
Parents: 8ead999
Author: Yin Huai <yh...@databricks.com>
Authored: Tue Jan 13 13:01:27 2015 -0800
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Tue Jan 13 13:01:27 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/SQLContext.scala |   1 +
 .../spark/sql/execution/SparkStrategies.scala   |  14 ++
 .../apache/spark/sql/execution/commands.scala   |   3 +-
 .../org/apache/spark/sql/sources/ddl.scala      |  53 ++--
 .../apache/spark/sql/sources/interfaces.scala   |  17 +-
 .../spark/sql/sources/TableScanSuite.scala      |  30 +++
 .../org/apache/spark/sql/hive/HiveContext.scala |  12 +
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  79 +++++-
 .../apache/spark/sql/hive/HiveStrategies.scala  |  11 +
 .../org/apache/spark/sql/hive/TestHive.scala    |   1 +
 .../spark/sql/hive/execution/commands.scala     |  21 ++
 sql/hive/src/test/resources/sample.json         |   2 +
 .../sql/hive/MetastoreDataSourcesSuite.scala    | 244 +++++++++++++++++++
 .../sql/hive/execution/HiveQuerySuite.scala     |   1 -
 14 files changed, 461 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6463e0b9/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 6c575dd..e7021cc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -330,6 +330,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
     def strategies: Seq[Strategy] =
       extraStrategies ++ (
       DataSourceStrategy ::
+      DDLStrategy ::
       TakeOrdered ::
       HashAggregation ::
       LeftSemiJoin ::

http://git-wip-us.apache.org/repos/asf/spark/blob/6463e0b9/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 99b6611..d91b1fb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution
 
+import org.apache.spark.sql.sources.{CreateTempTableUsing, CreateTableUsing}
 import org.apache.spark.sql.{SQLContext, Strategy, execution}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning._
@@ -310,4 +311,17 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
       case _ => Nil
     }
   }
+
+  object DDLStrategy extends Strategy {
+    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+      case CreateTableUsing(tableName, userSpecifiedSchema, provider, true, 
options) =>
+        ExecutedCommand(
+          CreateTempTableUsing(tableName, userSpecifiedSchema, provider, 
options)) :: Nil
+
+      case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, 
options) =>
+        sys.error("Tables created with SQLContext must be TEMPORARY. Use a 
HiveContext instead.")
+
+      case _ => Nil
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6463e0b9/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index 0d765c4..df8e616 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -177,7 +177,6 @@ case class DescribeCommand(
     override val output: Seq[Attribute]) extends RunnableCommand {
 
   override def run(sqlContext: SQLContext) = {
-    Row("# Registered as a temporary table", null, null) +:
-      child.output.map(field => Row(field.name, field.dataType.toString, null))
+    child.output.map(field => Row(field.name, field.dataType.toString, null))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6463e0b9/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index fe2c4d8..f8741e0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -92,21 +92,21 @@ private[sql] class DDLParser extends StandardTokenParsers 
with PackratParsers wi
   protected lazy val ddl: Parser[LogicalPlan] = createTable
 
   /**
-   * `CREATE TEMPORARY TABLE avroTable
+   * `CREATE [TEMPORARY] TABLE avroTable
    * USING org.apache.spark.sql.avro
    * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
    * or
-   * `CREATE TEMPORARY TABLE avroTable(intField int, stringField string...)
+   * `CREATE [TEMPORARY] TABLE avroTable(intField int, stringField string...)
    * USING org.apache.spark.sql.avro
    * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
    */
   protected lazy val createTable: Parser[LogicalPlan] =
   (
-    CREATE ~ TEMPORARY ~ TABLE ~> ident
+    (CREATE ~> TEMPORARY.? <~ TABLE) ~ ident
       ~ (tableCols).? ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ {
-      case tableName ~ columns ~ provider ~ opts =>
+      case temp ~ tableName ~ columns ~ provider ~ opts =>
         val userSpecifiedSchema = columns.flatMap(fields => 
Some(StructType(fields)))
-        CreateTableUsing(tableName, userSpecifiedSchema, provider, opts)
+        CreateTableUsing(tableName, userSpecifiedSchema, provider, 
temp.isDefined, opts)
     }
   )
 
@@ -175,13 +175,12 @@ private[sql] class DDLParser extends StandardTokenParsers 
with PackratParsers wi
     primitiveType
 }
 
-private[sql] case class CreateTableUsing(
-    tableName: String,
-    userSpecifiedSchema: Option[StructType],
-    provider: String,
-    options: Map[String, String]) extends RunnableCommand {
-
-  def run(sqlContext: SQLContext) = {
+object ResolvedDataSource {
+  def apply(
+      sqlContext: SQLContext,
+      userSpecifiedSchema: Option[StructType],
+      provider: String,
+      options: Map[String, String]): ResolvedDataSource = {
     val loader = Utils.getContextOrSparkClassLoader
     val clazz: Class[_] = try loader.loadClass(provider) catch {
       case cnf: java.lang.ClassNotFoundException =>
@@ -199,22 +198,44 @@ private[sql] case class CreateTableUsing(
               
.asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider]
               .createRelation(sqlContext, new CaseInsensitiveMap(options), 
schema)
           case _ =>
-            sys.error(s"${clazz.getCanonicalName} should extend 
SchemaRelationProvider.")
+            sys.error(s"${clazz.getCanonicalName} does not allow 
user-specified schemas.")
         }
       }
       case None => {
         clazz.newInstance match {
-          case dataSource: org.apache.spark.sql.sources.RelationProvider  =>
+          case dataSource: org.apache.spark.sql.sources.RelationProvider =>
             dataSource
               .asInstanceOf[org.apache.spark.sql.sources.RelationProvider]
               .createRelation(sqlContext, new CaseInsensitiveMap(options))
           case _ =>
-            sys.error(s"${clazz.getCanonicalName} should extend 
RelationProvider.")
+            sys.error(s"A schema needs to be specified when using 
${clazz.getCanonicalName}.")
         }
       }
     }
 
-    sqlContext.baseRelationToSchemaRDD(relation).registerTempTable(tableName)
+    new ResolvedDataSource(clazz, relation)
+  }
+}
+
+private[sql] case class ResolvedDataSource(provider: Class[_], relation: 
BaseRelation)
+
+private[sql] case class CreateTableUsing(
+    tableName: String,
+    userSpecifiedSchema: Option[StructType],
+    provider: String,
+    temporary: Boolean,
+    options: Map[String, String]) extends Command
+
+private [sql] case class CreateTempTableUsing(
+    tableName: String,
+    userSpecifiedSchema: Option[StructType],
+    provider: String,
+    options: Map[String, String])  extends RunnableCommand {
+
+  def run(sqlContext: SQLContext) = {
+    val resolved = ResolvedDataSource(sqlContext, userSpecifiedSchema, 
provider, options)
+
+    
sqlContext.baseRelationToSchemaRDD(resolved.relation).registerTempTable(tableName)
     Seq.empty
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6463e0b9/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 990f7e0..2a7be23 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -24,8 +24,8 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, 
Attribute}
 /**
  * ::DeveloperApi::
  * Implemented by objects that produce relations for a specific kind of data 
source.  When
- * Spark SQL is given a DDL operation with a USING clause specified, this 
interface is used to
- * pass in the parameters specified by a user.
+ * Spark SQL is given a DDL operation with a USING clause specified (to 
specify the implemented
+ * RelationProvider), this interface is used to pass in the parameters 
specified by a user.
  *
  * Users may specify the fully qualified class name of a given data source.  
When that class is
  * not found Spark SQL will append the class name `DefaultSource` to the path, 
allowing for
@@ -46,10 +46,10 @@ trait RelationProvider {
 
 /**
  * ::DeveloperApi::
- * Implemented by objects that produce relations for a specific kind of data 
source.  When
- * Spark SQL is given a DDL operation with
- * 1. USING clause: to specify the implemented SchemaRelationProvider
- * 2. User defined schema: users can define schema optionally when create table
+ * Implemented by objects that produce relations for a specific kind of data 
source
+ * with a given schema.  When Spark SQL is given a DDL operation with a USING 
clause specified (
+ * to specify the implemented SchemaRelationProvider) and a user defined 
schema, this interface
+ * is used to pass in the parameters specified by a user.
  *
  * Users may specify the fully qualified class name of a given data source.  
When that class is
  * not found Spark SQL will append the class name `DefaultSource` to the path, 
allowing for
@@ -57,6 +57,11 @@ trait RelationProvider {
  * data source 'org.apache.spark.sql.json.DefaultSource'
  *
  * A new instance of this class with be instantiated each time a DDL call is 
made.
+ *
+ * The difference between a [[RelationProvider]] and a 
[[SchemaRelationProvider]] is that
+ * users need to provide a schema when using a SchemaRelationProvider.
+ * A relation provider can inherits both [[RelationProvider]] and 
[[SchemaRelationProvider]]
+ * if it can support both schema inference and user-specified schemas.
  */
 @DeveloperApi
 trait SchemaRelationProvider {

http://git-wip-us.apache.org/repos/asf/spark/blob/6463e0b9/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
index 605190f..a1d2468 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
@@ -314,4 +314,34 @@ class TableScanSuite extends DataSourceTest {
       sql("SELECT * FROM oneToTenDef"),
       (1 to 10).map(Row(_)).toSeq)
   }
+
+  test("exceptions") {
+    // Make sure we do throw correct exception when users use a relation 
provider that
+    // only implements the RelationProvier or the SchemaRelationProvider.
+    val schemaNotAllowed = intercept[Exception] {
+      sql(
+        """
+          |CREATE TEMPORARY TABLE relationProvierWithSchema (i int)
+          |USING org.apache.spark.sql.sources.SimpleScanSource
+          |OPTIONS (
+          |  From '1',
+          |  To '10'
+          |)
+        """.stripMargin)
+    }
+    assert(schemaNotAllowed.getMessage.contains("does not allow user-specified 
schemas"))
+
+    val schemaNeeded = intercept[Exception] {
+      sql(
+        """
+          |CREATE TEMPORARY TABLE schemaRelationProvierWithoutSchema
+          |USING org.apache.spark.sql.sources.AllDataTypesScanSource
+          |OPTIONS (
+          |  From '1',
+          |  To '10'
+          |)
+        """.stripMargin)
+    }
+    assert(schemaNeeded.getMessage.contains("A schema needs to be specified 
when using"))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6463e0b9/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 02eac43..09ff4cc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -115,6 +115,16 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) 
{
     catalog.createTable("default", tableName, 
ScalaReflection.attributesFor[A], allowExisting)
   }
 
+  def refreshTable(tableName: String): Unit = {
+    // TODO: Database support...
+    catalog.refreshTable("default", tableName)
+  }
+
+  protected[hive] def invalidateTable(tableName: String): Unit = {
+    // TODO: Database support...
+    catalog.invalidateTable("default", tableName)
+  }
+
   /**
    * Analyzes the given table in the current database to generate statistics, 
which will be
    * used in query optimizations.
@@ -340,6 +350,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
     override def strategies: Seq[Strategy] = extraStrategies ++ Seq(
       DataSourceStrategy,
       HiveCommandStrategy(self),
+      HiveDDLStrategy,
+      DDLStrategy,
       TakeOrdered,
       ParquetOperations,
       InMemoryScans,

http://git-wip-us.apache.org/repos/asf/spark/blob/6463e0b9/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index c25288e..daeabb6 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -20,10 +20,11 @@ package org.apache.spark.sql.hive
 import java.io.IOException
 import java.util.{List => JList}
 
+import com.google.common.cache.{LoadingCache, CacheLoader, CacheBuilder}
+
 import org.apache.hadoop.util.ReflectionUtils
 import org.apache.hadoop.hive.metastore.TableType
-import org.apache.hadoop.hive.metastore.api.FieldSchema
-import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => 
TPartition}
+import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => 
TPartition, FieldSchema}
 import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table, 
HiveException}
 import org.apache.hadoop.hive.ql.metadata.InvalidTableException
 import org.apache.hadoop.hive.ql.plan.CreateTableDesc
@@ -39,6 +40,7 @@ import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.sources.{LogicalRelation, ResolvedDataSource}
 import org.apache.spark.util.Utils
 
 /* Implicit conversions */
@@ -50,8 +52,76 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) 
extends Catalog with
   /** Connection to hive metastore.  Usages should lock on `this`. */
   protected[hive] val client = Hive.get(hive.hiveconf)
 
+  // TODO: Use this everywhere instead of tuples or databaseName, tableName,.
+  /** A fully qualified identifier for a table (i.e., database.tableName) */
+  case class QualifiedTableName(database: String, name: String) {
+    def toLowerCase = QualifiedTableName(database.toLowerCase, 
name.toLowerCase)
+  }
+
+  /** A cache of Spark SQL data source tables that have been accessed. */
+  protected[hive] val cachedDataSourceTables: LoadingCache[QualifiedTableName, 
LogicalPlan] = {
+    val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() {
+      override def load(in: QualifiedTableName): LogicalPlan = {
+        logDebug(s"Creating new cached data source for $in")
+        val table = client.getTable(in.database, in.name)
+        val schemaString = table.getProperty("spark.sql.sources.schema")
+        val userSpecifiedSchema =
+          if (schemaString == null) {
+            None
+          } else {
+            Some(DataType.fromJson(schemaString).asInstanceOf[StructType])
+          }
+        // It does not appear that the ql client for the metastore has a way 
to enumerate all the
+        // SerDe properties directly...
+        val options = table.getTTable.getSd.getSerdeInfo.getParameters.toMap
+
+        val resolvedRelation =
+          ResolvedDataSource(
+            hive,
+            userSpecifiedSchema,
+            table.getProperty("spark.sql.sources.provider"),
+            options)
+
+        LogicalRelation(resolvedRelation.relation)
+      }
+    }
+
+    CacheBuilder.newBuilder().maximumSize(1000).build(cacheLoader)
+  }
+
+  def refreshTable(databaseName: String, tableName: String): Unit = {
+    cachedDataSourceTables.refresh(QualifiedTableName(databaseName, 
tableName).toLowerCase)
+  }
+
+  def invalidateTable(databaseName: String, tableName: String): Unit = {
+    cachedDataSourceTables.invalidate(QualifiedTableName(databaseName, 
tableName).toLowerCase)
+  }
+
   val caseSensitive: Boolean = false
 
+  def createDataSourceTable(
+      tableName: String,
+      userSpecifiedSchema: Option[StructType],
+      provider: String,
+      options: Map[String, String]) = {
+    val (dbName, tblName) = processDatabaseAndTableName("default", tableName)
+    val tbl = new Table(dbName, tblName)
+
+    tbl.setProperty("spark.sql.sources.provider", provider)
+    if (userSpecifiedSchema.isDefined) {
+      tbl.setProperty("spark.sql.sources.schema", userSpecifiedSchema.get.json)
+    }
+    options.foreach { case (key, value) => tbl.setSerdeParam(key, value) }
+
+    tbl.setProperty("EXTERNAL", "TRUE")
+    tbl.setTableType(TableType.EXTERNAL_TABLE)
+
+    // create the table
+    synchronized {
+      client.createTable(tbl, false)
+    }
+  }
+
   def tableExists(tableIdentifier: Seq[String]): Boolean = {
     val tableIdent = processTableIdentifier(tableIdentifier)
     val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse(
@@ -72,7 +142,10 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) 
extends Catalog with
       hive.sessionState.getCurrentDatabase)
     val tblName = tableIdent.last
     val table = client.getTable(databaseName, tblName)
-    if (table.isView) {
+
+    if (table.getProperty("spark.sql.sources.provider") != null) {
+      cachedDataSourceTables(QualifiedTableName(databaseName, 
tblName).toLowerCase)
+    } else if (table.isView) {
       // if the unresolved relation is from hive view
       // parse the text into logic node.
       HiveQl.createPlanForView(table, alias)

http://git-wip-us.apache.org/repos/asf/spark/blob/6463e0b9/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index c439b9e..cdff82e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.execution._
 import org.apache.spark.sql.hive
 import org.apache.spark.sql.hive.execution._
 import org.apache.spark.sql.parquet.ParquetRelation
+import org.apache.spark.sql.sources.CreateTableUsing
 import org.apache.spark.sql.{SQLContext, SchemaRDD, Strategy}
 
 import scala.collection.JavaConversions._
@@ -208,6 +209,16 @@ private[hive] trait HiveStrategies {
     }
   }
 
+  object HiveDDLStrategy extends Strategy {
+    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+      case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, 
options) =>
+        ExecutedCommand(
+          CreateMetastoreDataSource(tableName, userSpecifiedSchema, provider, 
options)) :: Nil
+
+      case _ => Nil
+    }
+  }
+
   case class HiveCommandStrategy(context: HiveContext) extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case describe: DescribeCommand =>

http://git-wip-us.apache.org/repos/asf/spark/blob/6463e0b9/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
index 1358a0e..31c7ce9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
@@ -395,6 +395,7 @@ class TestHiveContext(sc: SparkContext) extends 
HiveContext(sc) {
 
       clearCache()
       loadedTables.clear()
+      catalog.cachedDataSourceTables.invalidateAll()
       catalog.client.getAllTables("default").foreach { t =>
         logDebug(s"Deleting table $t")
         val table = catalog.client.getTable("default", t)

http://git-wip-us.apache.org/repos/asf/spark/blob/6463e0b9/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 6b733a2..e70cdea 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.execution
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.sql.catalyst.types.StructType
 import org.apache.spark.sql.execution.RunnableCommand
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.sql.SQLContext
@@ -52,6 +53,12 @@ case class DropTable(
   override def run(sqlContext: SQLContext) = {
     val hiveContext = sqlContext.asInstanceOf[HiveContext]
     val ifExistsClause = if (ifExists) "IF EXISTS " else ""
+    try {
+      hiveContext.tryUncacheQuery(hiveContext.table(tableName))
+    } catch {
+      case _: org.apache.hadoop.hive.ql.metadata.InvalidTableException =>
+    }
+    hiveContext.invalidateTable(tableName)
     hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName")
     hiveContext.catalog.unregisterTable(Seq(tableName))
     Seq.empty[Row]
@@ -85,3 +92,17 @@ case class AddFile(path: String) extends RunnableCommand {
     Seq.empty[Row]
   }
 }
+
+case class CreateMetastoreDataSource(
+    tableName: String,
+    userSpecifiedSchema: Option[StructType],
+    provider: String,
+    options: Map[String, String]) extends RunnableCommand {
+
+  override def run(sqlContext: SQLContext) = {
+    val hiveContext = sqlContext.asInstanceOf[HiveContext]
+    hiveContext.catalog.createDataSourceTable(tableName, userSpecifiedSchema, 
provider, options)
+
+    Seq.empty[Row]
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6463e0b9/sql/hive/src/test/resources/sample.json
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/resources/sample.json 
b/sql/hive/src/test/resources/sample.json
new file mode 100644
index 0000000..a2c2ffd
--- /dev/null
+++ b/sql/hive/src/test/resources/sample.json
@@ -0,0 +1,2 @@
+{"a" : "2" ,"b" : "blah", "c_!@(3)":1}
+{"<d>" : {"d!" : [4, 5], "=" : [{"Dd2": null}, {"Dd2" : true}]}}

http://git-wip-us.apache.org/repos/asf/spark/blob/6463e0b9/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
new file mode 100644
index 0000000..ec9ebb4
--- /dev/null
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.commons.io.FileUtils
+
+import org.apache.spark.sql._
+import org.apache.spark.util.Utils
+
+/* Implicits */
+import org.apache.spark.sql.hive.test.TestHive._
+
+/**
+ * Tests for persisting tables created though the data sources API into the 
metastore.
+ */
+class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
+  override def afterEach(): Unit = {
+    reset()
+  }
+
+  val filePath = Utils.getSparkClassLoader.getResource("sample.json").getFile
+
+  test ("persistent JSON table") {
+    sql(
+      s"""
+        |CREATE TABLE jsonTable
+        |USING org.apache.spark.sql.json.DefaultSource
+        |OPTIONS (
+        |  path '${filePath}'
+        |)
+      """.stripMargin)
+
+    checkAnswer(
+      sql("SELECT * FROM jsonTable"),
+      jsonFile(filePath).collect().toSeq)
+  }
+
+  test ("persistent JSON table with a user specified schema") {
+    sql(
+      s"""
+        |CREATE TABLE jsonTable (
+        |a string,
+        |b String,
+        |`c_!@(3)` int,
+        |`<d>` Struct<`d!`:array<int>, `=`:array<struct<Dd2: boolean>>>)
+        |USING org.apache.spark.sql.json.DefaultSource
+        |OPTIONS (
+        |  path '${filePath}'
+        |)
+      """.stripMargin)
+
+    jsonFile(filePath).registerTempTable("expectedJsonTable")
+
+    checkAnswer(
+      sql("SELECT a, b, `c_!@(3)`, `<d>`.`d!`, `<d>`.`=` FROM jsonTable"),
+      sql("SELECT a, b, `c_!@(3)`, `<d>`.`d!`, `<d>`.`=` FROM 
expectedJsonTable").collect().toSeq)
+  }
+
+  test ("persistent JSON table with a user specified schema with a subset of 
fields") {
+    // This works because JSON objects are self-describing and JSONRelation 
can get needed
+    // field values based on field names.
+    sql(
+      s"""
+        |CREATE TABLE jsonTable (`<d>` Struct<`=`:array<struct<Dd2: 
boolean>>>, b String)
+        |USING org.apache.spark.sql.json.DefaultSource
+        |OPTIONS (
+        |  path '${filePath}'
+        |)
+      """.stripMargin)
+
+    val innerStruct = StructType(
+      StructField("=", ArrayType(StructType(StructField("Dd2", BooleanType, 
true) :: Nil))) :: Nil)
+    val expectedSchema = StructType(
+      StructField("<d>", innerStruct, true) ::
+      StructField("b", StringType, true) :: Nil)
+
+    assert(expectedSchema == table("jsonTable").schema)
+
+    jsonFile(filePath).registerTempTable("expectedJsonTable")
+
+    checkAnswer(
+      sql("SELECT b, `<d>`.`=` FROM jsonTable"),
+      sql("SELECT b, `<d>`.`=` FROM expectedJsonTable").collect().toSeq)
+  }
+
+  test("resolve shortened provider names") {
+    sql(
+      s"""
+        |CREATE TABLE jsonTable
+        |USING org.apache.spark.sql.json
+        |OPTIONS (
+        |  path '${filePath}'
+        |)
+      """.stripMargin)
+
+    checkAnswer(
+      sql("SELECT * FROM jsonTable"),
+      jsonFile(filePath).collect().toSeq)
+  }
+
+  test("drop table") {
+    sql(
+      s"""
+        |CREATE TABLE jsonTable
+        |USING org.apache.spark.sql.json
+        |OPTIONS (
+        |  path '${filePath}'
+        |)
+      """.stripMargin)
+
+    checkAnswer(
+      sql("SELECT * FROM jsonTable"),
+      jsonFile(filePath).collect().toSeq)
+
+    sql("DROP TABLE jsonTable")
+
+    intercept[Exception] {
+      sql("SELECT * FROM jsonTable").collect()
+    }
+  }
+
+  test("check change without refresh") {
+    val tempDir = File.createTempFile("sparksql", "json")
+    tempDir.delete()
+    sparkContext.parallelize(("a", "b") :: 
Nil).toJSON.saveAsTextFile(tempDir.getCanonicalPath)
+
+    sql(
+      s"""
+        |CREATE TABLE jsonTable
+        |USING org.apache.spark.sql.json
+        |OPTIONS (
+        |  path '${tempDir.getCanonicalPath}'
+        |)
+      """.stripMargin)
+
+    checkAnswer(
+      sql("SELECT * FROM jsonTable"),
+      ("a", "b") :: Nil)
+
+    FileUtils.deleteDirectory(tempDir)
+    sparkContext.parallelize(("a1", "b1", "c1") :: 
Nil).toJSON.saveAsTextFile(tempDir.getCanonicalPath)
+
+    // Schema is cached so the new column does not show. The updated values in 
existing columns
+    // will show.
+    checkAnswer(
+      sql("SELECT * FROM jsonTable"),
+      ("a1", "b1") :: Nil)
+
+    refreshTable("jsonTable")
+
+    // Check that the refresh worked
+    checkAnswer(
+      sql("SELECT * FROM jsonTable"),
+      ("a1", "b1", "c1") :: Nil)
+    FileUtils.deleteDirectory(tempDir)
+  }
+
+  test("drop, change, recreate") {
+    val tempDir = File.createTempFile("sparksql", "json")
+    tempDir.delete()
+    sparkContext.parallelize(("a", "b") :: 
Nil).toJSON.saveAsTextFile(tempDir.getCanonicalPath)
+
+    sql(
+      s"""
+        |CREATE TABLE jsonTable
+        |USING org.apache.spark.sql.json
+        |OPTIONS (
+        |  path '${tempDir.getCanonicalPath}'
+        |)
+      """.stripMargin)
+
+    checkAnswer(
+      sql("SELECT * FROM jsonTable"),
+      ("a", "b") :: Nil)
+
+    FileUtils.deleteDirectory(tempDir)
+    sparkContext.parallelize(("a", "b", "c") :: 
Nil).toJSON.saveAsTextFile(tempDir.getCanonicalPath)
+
+    sql("DROP TABLE jsonTable")
+
+    sql(
+      s"""
+        |CREATE TABLE jsonTable
+        |USING org.apache.spark.sql.json
+        |OPTIONS (
+        |  path '${tempDir.getCanonicalPath}'
+        |)
+      """.stripMargin)
+
+    // New table should reflect new schema.
+    checkAnswer(
+      sql("SELECT * FROM jsonTable"),
+      ("a", "b", "c") :: Nil)
+    FileUtils.deleteDirectory(tempDir)
+  }
+
+  test("invalidate cache and reload") {
+    sql(
+      s"""
+        |CREATE TABLE jsonTable (`c_!@(3)` int)
+        |USING org.apache.spark.sql.json.DefaultSource
+        |OPTIONS (
+        |  path '${filePath}'
+        |)
+      """.stripMargin)
+
+    jsonFile(filePath).registerTempTable("expectedJsonTable")
+
+    checkAnswer(
+      sql("SELECT * FROM jsonTable"),
+      sql("SELECT `c_!@(3)` FROM expectedJsonTable").collect().toSeq)
+
+    // Discard the cached relation.
+    invalidateTable("jsonTable")
+
+    checkAnswer(
+      sql("SELECT * FROM jsonTable"),
+      sql("SELECT `c_!@(3)` FROM expectedJsonTable").collect().toSeq)
+
+    invalidateTable("jsonTable")
+    val expectedSchema = StructType(StructField("c_!@(3)", IntegerType, true) 
:: Nil)
+
+    assert(expectedSchema == table("jsonTable").schema)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6463e0b9/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 700a45e..4decd15 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -623,7 +623,6 @@ class HiveQuerySuite extends HiveComparisonTest with 
BeforeAndAfter {
 
     assertResult(
       Array(
-        Array("# Registered as a temporary table", null, null),
         Array("a", "IntegerType", null),
         Array("b", "StringType", null))
     ) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to