Repository: spark
Updated Branches:
  refs/heads/branch-2.0 ada319844 -> 36045106d


[SPARK-15553][SQL] Dataset.createTempView should use CreateViewCommand

## What changes were proposed in this pull request?

Let `Dataset.createTempView` and `Dataset.createOrReplaceTempView` use 
`CreateViewCommand`, rather than calling `SparkSession.createTempView`. 
Besides, this patch also removes `SparkSession.createTempView`.

## How was this patch tested?
Existing tests.

Author: Liang-Chi Hsieh <sim...@tw.ibm.com>

Closes #13327 from viirya/dataset-createtempview.

(cherry picked from commit f1b220eeeed1d4d12121fe0b3b175da44488da68)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/36045106
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/36045106
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/36045106

Branch: refs/heads/branch-2.0
Commit: 36045106d43b3952c55bae4439dbc86892399b3c
Parents: ada3198
Author: Liang-Chi Hsieh <sim...@tw.ibm.com>
Authored: Fri May 27 21:24:08 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Fri May 27 21:24:14 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/catalog/interface.scala  |  5 +++++
 .../scala/org/apache/spark/sql/Dataset.scala    | 23 +++++++++++++++-----
 .../scala/org/apache/spark/sql/SQLContext.scala |  2 +-
 .../org/apache/spark/sql/SparkSession.scala     | 11 ----------
 .../spark/sql/execution/SparkSqlParser.scala    | 18 +++++++--------
 .../spark/sql/execution/command/cache.scala     |  3 +--
 .../spark/sql/execution/command/views.scala     |  8 +++++--
 7 files changed, 39 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/36045106/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 4a073d1..77731b1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -50,6 +50,11 @@ case class CatalogStorageFormat(
     compressed: Boolean,
     serdeProperties: Map[String, String])
 
+object CatalogStorageFormat {
+  /** Empty storage format for default values and copies. */
+  val EmptyStorageFormat = CatalogStorageFormat(locationUri = None, 
inputFormat = None,
+    outputFormat = None, serde = None, compressed = false, serdeProperties = 
Map.empty)
+}
 
 /**
  * A column in a table.

http://git-wip-us.apache.org/repos/asf/spark/blob/36045106/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index abd16f2..7aeec20 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -35,6 +35,7 @@ import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst._
 import org.apache.spark.sql.catalyst.analysis._
+import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.encoders._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
@@ -44,7 +45,7 @@ import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.usePrettyExpression
 import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, 
QueryExecution, SQLExecution}
-import org.apache.spark.sql.execution.command.ExplainCommand
+import org.apache.spark.sql.execution.command.{CreateViewCommand, 
ExplainCommand}
 import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, 
LogicalRelation}
 import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
 import org.apache.spark.sql.execution.python.EvaluatePython
@@ -2329,8 +2330,14 @@ class Dataset[T] private[sql](
    * @since 2.0.0
    */
   @throws[AnalysisException]
-  def createTempView(viewName: String): Unit = {
-    sparkSession.createTempView(viewName, toDF(), replaceIfExists = false)
+  def createTempView(viewName: String): Unit = withPlan {
+    val tableDesc = CatalogTable(
+      identifier = 
sparkSession.sessionState.sqlParser.parseTableIdentifier(viewName),
+      tableType = CatalogTableType.VIEW,
+      schema = Seq.empty[CatalogColumn],
+      storage = CatalogStorageFormat.EmptyStorageFormat)
+    CreateViewCommand(tableDesc, logicalPlan, allowExisting = false, replace = 
false,
+      isTemporary = true, sql = "")
   }
 
   /**
@@ -2340,8 +2347,14 @@ class Dataset[T] private[sql](
    * @group basic
    * @since 2.0.0
    */
-  def createOrReplaceTempView(viewName: String): Unit = {
-    sparkSession.createTempView(viewName, toDF(), replaceIfExists = true)
+  def createOrReplaceTempView(viewName: String): Unit = withPlan {
+    val tableDesc = CatalogTable(
+      identifier = 
sparkSession.sessionState.sqlParser.parseTableIdentifier(viewName),
+      tableType = CatalogTableType.VIEW,
+      schema = Seq.empty[CatalogColumn],
+      storage = CatalogStorageFormat.EmptyStorageFormat)
+    CreateViewCommand(tableDesc, logicalPlan, allowExisting = false, replace = 
true,
+      isTemporary = true, sql = "")
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/36045106/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 66d9aa2..af419fc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -552,7 +552,7 @@ class SQLContext private[sql](val sparkSession: 
SparkSession)
    * only during the lifetime of this instance of SQLContext.
    */
   private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String): 
Unit = {
-    sparkSession.createTempView(tableName, df, replaceIfExists = true)
+    df.createOrReplaceTempView(tableName)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/36045106/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index c9276cf..20e22ba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -583,17 +583,6 @@ class SparkSession private(
     Dataset.ofRows(self, sessionState.catalog.lookupRelation(tableIdent))
   }
 
-  /**
-   * Creates a temporary view with a DataFrame. The lifetime of this temporary 
view is tied to
-   * this [[SparkSession]].
-   */
-  private[sql] def createTempView(
-      viewName: String, df: DataFrame, replaceIfExists: Boolean) = {
-    sessionState.catalog.createTempView(
-      sessionState.sqlParser.parseTableIdentifier(viewName).table,
-      df.logicalPlan, replaceIfExists)
-  }
-
   /* ----------------- *
    |  Everything else  |
    * ----------------- */

http://git-wip-us.apache.org/repos/asf/spark/blob/36045106/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index cfebfc6..48fb95b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -902,8 +902,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
     }
     validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx)
     val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat)
-      .getOrElse(EmptyStorageFormat)
-    val rowStorage = 
Option(ctx.rowFormat).map(visitRowFormat).getOrElse(EmptyStorageFormat)
+      .getOrElse(CatalogStorageFormat.EmptyStorageFormat)
+    val rowStorage = Option(ctx.rowFormat).map(visitRowFormat)
+      .getOrElse(CatalogStorageFormat.EmptyStorageFormat)
     val location = Option(ctx.locationSpec).map(visitLocationSpec)
     // If we are creating an EXTERNAL table, then the LOCATION field is 
required
     if (external && location.isEmpty) {
@@ -976,15 +977,12 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
     }
   }
 
-  /** Empty storage format for default values and copies. */
-  private val EmptyStorageFormat = CatalogStorageFormat(None, None, None, 
None, false, Map.empty)
-
   /**
    * Create a [[CatalogStorageFormat]].
    */
   override def visitTableFileFormat(
       ctx: TableFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
-    EmptyStorageFormat.copy(
+    CatalogStorageFormat.EmptyStorageFormat.copy(
       inputFormat = Option(string(ctx.inFmt)),
       outputFormat = Option(string(ctx.outFmt)))
   }
@@ -997,7 +995,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
     val source = ctx.identifier.getText
     HiveSerDe.sourceToSerDe(source, conf) match {
       case Some(s) =>
-        EmptyStorageFormat.copy(
+        CatalogStorageFormat.EmptyStorageFormat.copy(
           inputFormat = s.inputFormat,
           outputFormat = s.outputFormat,
           serde = s.serde)
@@ -1037,7 +1035,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
   override def visitRowFormatSerde(
       ctx: RowFormatSerdeContext): CatalogStorageFormat = withOrigin(ctx) {
     import ctx._
-    EmptyStorageFormat.copy(
+    CatalogStorageFormat.EmptyStorageFormat.copy(
       serde = Option(string(name)),
       serdeProperties = 
Option(tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty))
   }
@@ -1067,7 +1065,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
             ctx)
           "line.delim" -> value
         }
-    EmptyStorageFormat.copy(serdeProperties = entries.toMap)
+    CatalogStorageFormat.EmptyStorageFormat.copy(serdeProperties = 
entries.toMap)
   }
 
   /**
@@ -1181,7 +1179,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
       identifier = visitTableIdentifier(name),
       tableType = CatalogTableType.VIEW,
       schema = schema,
-      storage = EmptyStorageFormat,
+      storage = CatalogStorageFormat.EmptyStorageFormat,
       properties = properties,
       viewOriginalText = sql,
       viewText = sql,

http://git-wip-us.apache.org/repos/asf/spark/blob/36045106/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
index 31dc016..b1290a4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
@@ -30,8 +30,7 @@ case class CacheTableCommand(
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     plan.foreach { logicalPlan =>
-      sparkSession.createTempView(
-        tableName, Dataset.ofRows(sparkSession, logicalPlan), replaceIfExists 
= true)
+      Dataset.ofRows(sparkSession, 
logicalPlan).createOrReplaceTempView(tableName)
     }
     sparkSession.catalog.cacheTable(tableName)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/36045106/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 8499011..6468916 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -57,8 +57,12 @@ case class CreateViewCommand(
 
   override def output: Seq[Attribute] = Seq.empty[Attribute]
 
-  require(tableDesc.tableType == CatalogTableType.VIEW)
-  require(tableDesc.viewText.isDefined)
+  require(tableDesc.tableType == CatalogTableType.VIEW,
+    "The type of the table to created with CREATE VIEW must be 
'CatalogTableType.VIEW'.")
+  if (!isTemporary) {
+    require(tableDesc.viewText.isDefined,
+      "The table to created with CREATE VIEW must have 'viewText'.")
+  }
 
   if (allowExisting && replace) {
     throw new AnalysisException("CREATE VIEW with both IF NOT EXISTS and 
REPLACE is not allowed.")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to