Repository: spark Updated Branches: refs/heads/branch-2.0 ada319844 -> 36045106d
[SPARK-15553][SQL] Dataset.createTempView should use CreateViewCommand ## What changes were proposed in this pull request? Let `Dataset.createTempView` and `Dataset.createOrReplaceTempView` use `CreateViewCommand`, rather than calling `SparkSession.createTempView`. Besides, this patch also removes `SparkSession.createTempView`. ## How was this patch tested? Existing tests. Author: Liang-Chi Hsieh <sim...@tw.ibm.com> Closes #13327 from viirya/dataset-createtempview. (cherry picked from commit f1b220eeeed1d4d12121fe0b3b175da44488da68) Signed-off-by: Reynold Xin <r...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/36045106 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/36045106 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/36045106 Branch: refs/heads/branch-2.0 Commit: 36045106d43b3952c55bae4439dbc86892399b3c Parents: ada3198 Author: Liang-Chi Hsieh <sim...@tw.ibm.com> Authored: Fri May 27 21:24:08 2016 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Fri May 27 21:24:14 2016 -0700 ---------------------------------------------------------------------- .../spark/sql/catalyst/catalog/interface.scala | 5 +++++ .../scala/org/apache/spark/sql/Dataset.scala | 23 +++++++++++++++----- .../scala/org/apache/spark/sql/SQLContext.scala | 2 +- .../org/apache/spark/sql/SparkSession.scala | 11 ---------- .../spark/sql/execution/SparkSqlParser.scala | 18 +++++++-------- .../spark/sql/execution/command/cache.scala | 3 +-- .../spark/sql/execution/command/views.scala | 8 +++++-- 7 files changed, 39 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/36045106/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 4a073d1..77731b1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -50,6 +50,11 @@ case class CatalogStorageFormat( compressed: Boolean, serdeProperties: Map[String, String]) +object CatalogStorageFormat { + /** Empty storage format for default values and copies. */ + val EmptyStorageFormat = CatalogStorageFormat(locationUri = None, inputFormat = None, + outputFormat = None, serde = None, compressed = false, serdeProperties = Map.empty) +} /** * A column in a table. http://git-wip-us.apache.org/repos/asf/spark/blob/36045106/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index abd16f2..7aeec20 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -35,6 +35,7 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.analysis._ +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ @@ -44,7 +45,7 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.usePrettyExpression import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, QueryExecution, SQLExecution} -import org.apache.spark.sql.execution.command.ExplainCommand +import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand} import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation} import org.apache.spark.sql.execution.datasources.json.JacksonGenerator import org.apache.spark.sql.execution.python.EvaluatePython @@ -2329,8 +2330,14 @@ class Dataset[T] private[sql]( * @since 2.0.0 */ @throws[AnalysisException] - def createTempView(viewName: String): Unit = { - sparkSession.createTempView(viewName, toDF(), replaceIfExists = false) + def createTempView(viewName: String): Unit = withPlan { + val tableDesc = CatalogTable( + identifier = sparkSession.sessionState.sqlParser.parseTableIdentifier(viewName), + tableType = CatalogTableType.VIEW, + schema = Seq.empty[CatalogColumn], + storage = CatalogStorageFormat.EmptyStorageFormat) + CreateViewCommand(tableDesc, logicalPlan, allowExisting = false, replace = false, + isTemporary = true, sql = "") } /** @@ -2340,8 +2347,14 @@ class Dataset[T] private[sql]( * @group basic * @since 2.0.0 */ - def createOrReplaceTempView(viewName: String): Unit = { - sparkSession.createTempView(viewName, toDF(), replaceIfExists = true) + def createOrReplaceTempView(viewName: String): Unit = withPlan { + val tableDesc = CatalogTable( + identifier = sparkSession.sessionState.sqlParser.parseTableIdentifier(viewName), + tableType = CatalogTableType.VIEW, + schema = Seq.empty[CatalogColumn], + storage = CatalogStorageFormat.EmptyStorageFormat) + CreateViewCommand(tableDesc, logicalPlan, allowExisting = false, replace = true, + isTemporary = true, sql = "") } /** http://git-wip-us.apache.org/repos/asf/spark/blob/36045106/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 66d9aa2..af419fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -552,7 +552,7 @@ class SQLContext private[sql](val sparkSession: SparkSession) * only during the lifetime of this instance of SQLContext. */ private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String): Unit = { - sparkSession.createTempView(tableName, df, replaceIfExists = true) + df.createOrReplaceTempView(tableName) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/36045106/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index c9276cf..20e22ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -583,17 +583,6 @@ class SparkSession private( Dataset.ofRows(self, sessionState.catalog.lookupRelation(tableIdent)) } - /** - * Creates a temporary view with a DataFrame. The lifetime of this temporary view is tied to - * this [[SparkSession]]. - */ - private[sql] def createTempView( - viewName: String, df: DataFrame, replaceIfExists: Boolean) = { - sessionState.catalog.createTempView( - sessionState.sqlParser.parseTableIdentifier(viewName).table, - df.logicalPlan, replaceIfExists) - } - /* ----------------- * | Everything else | * ----------------- */ http://git-wip-us.apache.org/repos/asf/spark/blob/36045106/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index cfebfc6..48fb95b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -902,8 +902,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx) val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat) - .getOrElse(EmptyStorageFormat) - val rowStorage = Option(ctx.rowFormat).map(visitRowFormat).getOrElse(EmptyStorageFormat) + .getOrElse(CatalogStorageFormat.EmptyStorageFormat) + val rowStorage = Option(ctx.rowFormat).map(visitRowFormat) + .getOrElse(CatalogStorageFormat.EmptyStorageFormat) val location = Option(ctx.locationSpec).map(visitLocationSpec) // If we are creating an EXTERNAL table, then the LOCATION field is required if (external && location.isEmpty) { @@ -976,15 +977,12 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } } - /** Empty storage format for default values and copies. */ - private val EmptyStorageFormat = CatalogStorageFormat(None, None, None, None, false, Map.empty) - /** * Create a [[CatalogStorageFormat]]. */ override def visitTableFileFormat( ctx: TableFileFormatContext): CatalogStorageFormat = withOrigin(ctx) { - EmptyStorageFormat.copy( + CatalogStorageFormat.EmptyStorageFormat.copy( inputFormat = Option(string(ctx.inFmt)), outputFormat = Option(string(ctx.outFmt))) } @@ -997,7 +995,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { val source = ctx.identifier.getText HiveSerDe.sourceToSerDe(source, conf) match { case Some(s) => - EmptyStorageFormat.copy( + CatalogStorageFormat.EmptyStorageFormat.copy( inputFormat = s.inputFormat, outputFormat = s.outputFormat, serde = s.serde) @@ -1037,7 +1035,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { override def visitRowFormatSerde( ctx: RowFormatSerdeContext): CatalogStorageFormat = withOrigin(ctx) { import ctx._ - EmptyStorageFormat.copy( + CatalogStorageFormat.EmptyStorageFormat.copy( serde = Option(string(name)), serdeProperties = Option(tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)) } @@ -1067,7 +1065,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { ctx) "line.delim" -> value } - EmptyStorageFormat.copy(serdeProperties = entries.toMap) + CatalogStorageFormat.EmptyStorageFormat.copy(serdeProperties = entries.toMap) } /** @@ -1181,7 +1179,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { identifier = visitTableIdentifier(name), tableType = CatalogTableType.VIEW, schema = schema, - storage = EmptyStorageFormat, + storage = CatalogStorageFormat.EmptyStorageFormat, properties = properties, viewOriginalText = sql, viewText = sql, http://git-wip-us.apache.org/repos/asf/spark/blob/36045106/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala index 31dc016..b1290a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala @@ -30,8 +30,7 @@ case class CacheTableCommand( override def run(sparkSession: SparkSession): Seq[Row] = { plan.foreach { logicalPlan => - sparkSession.createTempView( - tableName, Dataset.ofRows(sparkSession, logicalPlan), replaceIfExists = true) + Dataset.ofRows(sparkSession, logicalPlan).createOrReplaceTempView(tableName) } sparkSession.catalog.cacheTable(tableName) http://git-wip-us.apache.org/repos/asf/spark/blob/36045106/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 8499011..6468916 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -57,8 +57,12 @@ case class CreateViewCommand( override def output: Seq[Attribute] = Seq.empty[Attribute] - require(tableDesc.tableType == CatalogTableType.VIEW) - require(tableDesc.viewText.isDefined) + require(tableDesc.tableType == CatalogTableType.VIEW, + "The type of the table to created with CREATE VIEW must be 'CatalogTableType.VIEW'.") + if (!isTemporary) { + require(tableDesc.viewText.isDefined, + "The table to created with CREATE VIEW must have 'viewText'.") + } if (allowExisting && replace) { throw new AnalysisException("CREATE VIEW with both IF NOT EXISTS and REPLACE is not allowed.") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org