This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new da84f81b1e2 [SPARK-44355][SQL] Move WithCTE into command queries da84f81b1e2 is described below commit da84f81b1e2ed9386cc5534141f53273ea4cb57d Author: Max Gekk <max.g...@gmail.com> AuthorDate: Tue Jul 25 21:53:18 2023 +0800 [SPARK-44355][SQL] Move WithCTE into command queries This PR is based on https://github.com/apache/spark/pull/42022 to fix tests, as the PR author is on vacation. ### What changes were proposed in this pull request? In the PR, I propose to add new trait `CTEInChildren` and mix it to some commands that should have `WithCTE` on top of their children (queries) instead of main query. Also I modified the `CTESubstitution` rule and removed special handling of `Command`s and similar nodes. After the changes, `Command`, `ParsedStatement` and `InsertIntoDir` are handled in the same way as other queries by referring to CTE Defs. Only the difference is in `WithCTE` is not not placed on the top of main quer [...] Closes #41922 ### Why are the changes needed? To improve code maintenance. Right now the CTE resolution code path is diverged: query with commands go into CTE inline code path where non-command queries go into CTEDef code path. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? By running new test: ``` $ build/sbt "test:testOnly *InsertSuite" ``` Closes #42036 from cloud-fan/help. Lead-authored-by: Max Gekk <max.g...@gmail.com> Co-authored-by: Wenchen Fan <wenc...@databricks.com> Co-authored-by: Wenchen Fan <cloud0...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../sql/catalyst/analysis/CTESubstitution.scala | 85 ++++++++---- .../plans/logical/basicLogicalOperators.scala | 12 +- .../sql/catalyst/plans/logical/statements.scala | 2 +- .../sql/catalyst/plans/logical/v2Commands.scala | 23 +++- .../org/apache/spark/sql/internal/SQLConf.scala | 9 ++ .../sql/execution/command/DataWritingCommand.scala | 4 +- .../command/InsertIntoDataSourceDirCommand.scala | 8 +- .../execution/command/createDataSourceTables.scala | 8 +- .../spark/sql/execution/command/tables.scala | 6 +- .../apache/spark/sql/execution/command/views.scala | 14 +- .../datasources/InsertIntoDataSourceCommand.scala | 8 +- .../datasources/SaveIntoDataSourceCommand.scala | 8 +- .../ansi/double-quoted-identifiers-enabled.sql.out | 14 +- .../sql-tests/analyzer-results/cte-command.sql.out | 152 +++++++++++++++++++++ .../analyzer-results/postgreSQL/with.sql.out | 12 +- .../resources/sql-tests/inputs/cte-command.sql | 33 +++++ .../sql-tests/results/cte-command.sql.out | 121 ++++++++++++++++ .../execution/CreateHiveTableAsSelectCommand.scala | 8 +- 18 files changed, 473 insertions(+), 54 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala index 4e3234f9c0d..cd174f71892 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala @@ -20,18 +20,18 @@ package org.apache.spark.sql.catalyst.analysis import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.expressions.SubqueryExpression -import org.apache.spark.sql.catalyst.plans.logical.{Command, CTERelationDef, CTERelationRef, InsertIntoDir, LogicalPlan, ParsedStatement, SubqueryAlias, UnresolvedWith, WithCTE} +import org.apache.spark.sql.catalyst.plans.logical.{Command, CTEInChildren, CTERelationDef, CTERelationRef, InsertIntoDir, LogicalPlan, ParsedStatement, SubqueryAlias, UnresolvedWith, WithCTE} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern._ import org.apache.spark.sql.catalyst.util.TypeUtils._ import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.{LEGACY_CTE_PRECEDENCE_POLICY, LegacyBehaviorPolicy} /** * Analyze WITH nodes and substitute child plan with CTE references or CTE definitions depending * on the conditions below: - * 1. If in legacy mode, or if the query is a SQL command or DML statement, replace with CTE - * definitions, i.e., inline CTEs. + * 1. If in legacy mode, replace with CTE definitions, i.e., inline CTEs. * 2. Otherwise, replace with CTE references `CTERelationRef`s. The decision to inline or not * inline will be made later by the rule `InlineCTE` after query analysis. * @@ -46,42 +46,62 @@ import org.apache.spark.sql.internal.SQLConf.{LEGACY_CTE_PRECEDENCE_POLICY, Lega * dependency for any valid CTE query (i.e., given CTE definitions A and B with B referencing A, * A is guaranteed to appear before B). Otherwise, it must be an invalid user query, and an * analysis exception will be thrown later by relation resolving rules. + * + * If the query is a SQL command or DML statement (extends `CTEInChildren`), + * place `WithCTE` into their children. */ object CTESubstitution extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = { if (!plan.containsPattern(UNRESOLVED_WITH)) { return plan } - val isCommand = plan.exists { - case _: Command | _: ParsedStatement | _: InsertIntoDir => true - case _ => false + + val commands = plan.collect { + case c @ (_: Command | _: ParsedStatement | _: InsertIntoDir) => c } + val forceInline = if (commands.length == 1) { + if (conf.getConf(SQLConf.LEGACY_INLINE_CTE_IN_COMMANDS)) { + // The legacy behavior always inlines the CTE relations for queries in commands. + true + } else { + // If there is only one command and it's `CTEInChildren`, we can resolve + // CTE normally and don't need to force inline. + !commands.head.isInstanceOf[CTEInChildren] + } + } else if (commands.length > 1) { + // This can happen with the multi-insert statement. We should fall back to + // the legacy behavior. + true + } else { + false + } + val cteDefs = ArrayBuffer.empty[CTERelationDef] val (substituted, firstSubstituted) = LegacyBehaviorPolicy.withName(conf.getConf(LEGACY_CTE_PRECEDENCE_POLICY)) match { case LegacyBehaviorPolicy.EXCEPTION => assertNoNameConflictsInCTE(plan) - traverseAndSubstituteCTE(plan, isCommand, Seq.empty, cteDefs) + traverseAndSubstituteCTE(plan, forceInline, Seq.empty, cteDefs) case LegacyBehaviorPolicy.LEGACY => (legacyTraverseAndSubstituteCTE(plan, cteDefs), None) case LegacyBehaviorPolicy.CORRECTED => - traverseAndSubstituteCTE(plan, isCommand, Seq.empty, cteDefs) + traverseAndSubstituteCTE(plan, forceInline, Seq.empty, cteDefs) } if (cteDefs.isEmpty) { substituted } else if (substituted eq firstSubstituted.get) { - WithCTE(substituted, cteDefs.toSeq) + withCTEDefs(substituted, cteDefs.toSeq) } else { var done = false substituted.resolveOperatorsWithPruning(_ => !done) { case p if p eq firstSubstituted.get => // `firstSubstituted` is the parent of all other CTEs (if any). done = true - WithCTE(p, cteDefs.toSeq) + withCTEDefs(p, cteDefs.toSeq) case p if p.children.count(_.containsPattern(CTE)) > 1 => // This is the first common parent of all CTEs. done = true - WithCTE(p, cteDefs.toSeq) + withCTEDefs(p, cteDefs.toSeq) } } } @@ -131,7 +151,7 @@ object CTESubstitution extends Rule[LogicalPlan] { plan.resolveOperatorsUp { case UnresolvedWith(child, relations) => val resolvedCTERelations = - resolveCTERelations(relations, isLegacy = true, isCommand = false, Seq.empty, cteDefs) + resolveCTERelations(relations, isLegacy = true, forceInline = false, Seq.empty, cteDefs) substituteCTE(child, alwaysInline = true, resolvedCTERelations) } } @@ -168,7 +188,7 @@ object CTESubstitution extends Rule[LogicalPlan] { * SELECT * FROM t * ) * @param plan the plan to be traversed - * @param isCommand if this is a command + * @param forceInline always inline the CTE relations if this is true * @param outerCTEDefs already resolved outer CTE definitions with names * @param cteDefs all accumulated CTE definitions * @return the plan where CTE substitution is applied and optionally the last substituted `With` @@ -176,7 +196,7 @@ object CTESubstitution extends Rule[LogicalPlan] { */ private def traverseAndSubstituteCTE( plan: LogicalPlan, - isCommand: Boolean, + forceInline: Boolean, outerCTEDefs: Seq[(String, CTERelationDef)], cteDefs: ArrayBuffer[CTERelationDef]): (LogicalPlan, Option[LogicalPlan]) = { var firstSubstituted: Option[LogicalPlan] = None @@ -184,11 +204,11 @@ object CTESubstitution extends Rule[LogicalPlan] { _.containsAnyPattern(UNRESOLVED_WITH, PLAN_EXPRESSION)) { case UnresolvedWith(child: LogicalPlan, relations) => val resolvedCTERelations = - resolveCTERelations(relations, isLegacy = false, isCommand, outerCTEDefs, cteDefs) ++ + resolveCTERelations(relations, isLegacy = false, forceInline, outerCTEDefs, cteDefs) ++ outerCTEDefs val substituted = substituteCTE( - traverseAndSubstituteCTE(child, isCommand, resolvedCTERelations, cteDefs)._1, - isCommand, + traverseAndSubstituteCTE(child, forceInline, resolvedCTERelations, cteDefs)._1, + forceInline, resolvedCTERelations) if (firstSubstituted.isEmpty) { firstSubstituted = Some(substituted) @@ -206,10 +226,11 @@ object CTESubstitution extends Rule[LogicalPlan] { private def resolveCTERelations( relations: Seq[(String, SubqueryAlias)], isLegacy: Boolean, - isCommand: Boolean, + forceInline: Boolean, outerCTEDefs: Seq[(String, CTERelationDef)], cteDefs: ArrayBuffer[CTERelationDef]): Seq[(String, CTERelationDef)] = { - var resolvedCTERelations = if (isLegacy || isCommand) { + val alwaysInline = isLegacy || forceInline + var resolvedCTERelations = if (alwaysInline) { Seq.empty } else { outerCTEDefs @@ -232,12 +253,12 @@ object CTESubstitution extends Rule[LogicalPlan] { // WITH t3 AS (SELECT * FROM t1) // ) // t3 should resolve the t1 to `SELECT 2` instead of `SELECT 1`. - traverseAndSubstituteCTE(relation, isCommand, resolvedCTERelations, cteDefs)._1 + traverseAndSubstituteCTE(relation, forceInline, resolvedCTERelations, cteDefs)._1 } // CTE definition can reference a previous one - val substituted = substituteCTE(innerCTEResolved, isLegacy || isCommand, resolvedCTERelations) + val substituted = substituteCTE(innerCTEResolved, alwaysInline, resolvedCTERelations) val cteRelation = CTERelationDef(substituted) - if (!(isLegacy || isCommand)) { + if (!alwaysInline) { cteDefs += cteRelation } // Prepending new CTEs makes sure that those have higher priority over outer ones. @@ -249,7 +270,7 @@ object CTESubstitution extends Rule[LogicalPlan] { private def substituteCTE( plan: LogicalPlan, alwaysInline: Boolean, - cteRelations: Seq[(String, CTERelationDef)]): LogicalPlan = + cteRelations: Seq[(String, CTERelationDef)]): LogicalPlan = { plan.resolveOperatorsUpWithPruning( _.containsAnyPattern(RELATION_TIME_TRAVEL, UNRESOLVED_RELATION, PLAN_EXPRESSION)) { case RelationTimeTravel(UnresolvedRelation(Seq(table), _, _), _, _) @@ -273,4 +294,22 @@ object CTESubstitution extends Rule[LogicalPlan] { e.withNewPlan(apply(substituteCTE(e.plan, alwaysInline, cteRelations))) } } + } + + /** + * For commands which extend `CTEInChildren`, we should place the `WithCTE` node on its + * children. There are two reasons: + * 1. Some rules will pattern match the root command nodes, and we should keep command + * as the root node to not break them. + * 2. `Dataset` eagerly executes the commands inside a query plan. For example, + * sql("WITH v ... CREATE TABLE t AS SELECT * FROM v") will create the table instead of just + * analyzing the command. However, the CTE references inside commands will be invalid if we + * execute the command alone, as the CTE definitions are outside of the command. + */ + private def withCTEDefs(p: LogicalPlan, cteDefs: Seq[CTERelationDef]): LogicalPlan = { + p match { + case c: CTEInChildren => c.withCTEDefs(cteDefs) + case _ => WithCTE(p, cteDefs) + } + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 57e4b31dbe0..efb7dbb44ef 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -684,7 +684,7 @@ case class InsertIntoDir( provider: Option[String], child: LogicalPlan, overwrite: Boolean = true) - extends UnaryNode { + extends UnaryNode with CTEInChildren { override def output: Seq[Attribute] = Seq.empty override def metadataOutput: Seq[Attribute] = Nil @@ -894,6 +894,16 @@ case class WithCTE(plan: LogicalPlan, cteDefs: Seq[CTERelationDef]) extends Logi } } +/** + * The logical node which is able to place the `WithCTE` node on its children. + */ +trait CTEInChildren extends LogicalPlan { + def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { + withNewChildren(children.map(WithCTE(_, cteDefs))) + } +} + + case class WithWindowDefinition( windowDefinitions: Map[String, WindowSpecDefinition], child: LogicalPlan) extends UnaryNode { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index 669750ee448..9efc3b13bc2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.types.DataType * Parsed logical plans are located in Catalyst so that as much SQL parsing logic as possible is be * kept in a [[org.apache.spark.sql.catalyst.parser.AbstractSqlParser]]. */ -abstract class ParsedStatement extends LogicalPlan { +abstract class ParsedStatement extends LogicalPlan with CTEInChildren { // Redact properties and options when parsed nodes are used by generic methods like toString override def productIterator: Iterator[Any] = super.productIterator.map { case mapArg: Map[_, _] => conf.redactOptions(mapArg) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 739ffa487e3..5c83da1a96a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -46,7 +46,7 @@ trait KeepAnalyzedQuery extends Command { /** * Base trait for DataSourceV2 write commands */ -trait V2WriteCommand extends UnaryCommand with KeepAnalyzedQuery { +trait V2WriteCommand extends UnaryCommand with KeepAnalyzedQuery with CTEInChildren { def table: NamedRelation def query: LogicalPlan def isByName: Boolean @@ -392,9 +392,16 @@ case class WriteDelta( } } -trait V2CreateTableAsSelectPlan extends V2CreateTablePlan with AnalysisOnlyCommand { +trait V2CreateTableAsSelectPlan + extends V2CreateTablePlan + with AnalysisOnlyCommand + with CTEInChildren { def query: LogicalPlan + override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { + withNameAndQuery(newName = name, newQuery = WithCTE(query, cteDefs)) + } + override lazy val resolved: Boolean = childrenResolved && { // the table schema is created from the query schema, so the only resolution needed is to check // that the columns referenced by the table's partitioning exist in the query schema @@ -1234,12 +1241,16 @@ case class RepairTable( case class AlterViewAs( child: LogicalPlan, originalText: String, - query: LogicalPlan) extends BinaryCommand { + query: LogicalPlan) extends BinaryCommand with CTEInChildren { override def left: LogicalPlan = child override def right: LogicalPlan = query override protected def withNewChildrenInternal( newLeft: LogicalPlan, newRight: LogicalPlan): LogicalPlan = copy(child = newLeft, query = newRight) + + override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { + withNewChildren(Seq(child, WithCTE(query, cteDefs))) + } } /** @@ -1253,12 +1264,16 @@ case class CreateView( originalText: Option[String], query: LogicalPlan, allowExisting: Boolean, - replace: Boolean) extends BinaryCommand { + replace: Boolean) extends BinaryCommand with CTEInChildren { override def left: LogicalPlan = child override def right: LogicalPlan = query override protected def withNewChildrenInternal( newLeft: LogicalPlan, newRight: LogicalPlan): LogicalPlan = copy(child = newLeft, query = newRight) + + override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { + withNewChildren(Seq(child, WithCTE(query, cteDefs))) + } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 00bb6f77ef3..011dbd10f5c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3766,6 +3766,15 @@ object SQLConf { .checkValues(LegacyBehaviorPolicy.values.map(_.toString)) .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString) + val LEGACY_INLINE_CTE_IN_COMMANDS = buildConf("spark.sql.legacy.inlineCTEInCommands") + .internal() + .doc("If true, always inline the CTE relations for the queries in commands. This is the " + + "legacy behavior which may produce incorrect results because Spark may evaluate a CTE " + + "relation more than once, even if it's nondeterministic.") + .version("4.0.0") + .booleanConf + .createWithDefault(false) + val LEGACY_TIME_PARSER_POLICY = buildConf("spark.sql.legacy.timeParserPolicy") .internal() .doc("When LEGACY, java.text.SimpleDateFormat is used for formatting and parsing " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala index 338ce8cac42..592ae04a055 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala @@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkContext import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryCommand} +import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, LogicalPlan, UnaryCommand} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker @@ -35,7 +35,7 @@ import org.apache.spark.util.SerializableConfiguration /** * A special `Command` which writes data out and updates metrics. */ -trait DataWritingCommand extends UnaryCommand { +trait DataWritingCommand extends UnaryCommand with CTEInChildren { /** * The input query plan that produces the data to be written. * IMPORTANT: the input query plan MUST be analyzed, so that we can carry its output columns diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala index 35c8bec3716..67d38b28c83 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, LogicalPlan, WithCTE} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources._ @@ -42,7 +42,7 @@ case class InsertIntoDataSourceDirCommand( storage: CatalogStorageFormat, provider: String, query: LogicalPlan, - overwrite: Boolean) extends LeafRunnableCommand { + overwrite: Boolean) extends LeafRunnableCommand with CTEInChildren { override def innerChildren: Seq[LogicalPlan] = query :: Nil @@ -76,4 +76,8 @@ case class InsertIntoDataSourceDirCommand( Seq.empty[Row] } + + override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { + copy(query = WithCTE(query, cteDefs)) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 3848d550515..54e8181c56c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -21,7 +21,7 @@ import java.net.URI import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, LogicalPlan, WithCTE} import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, CharVarcharUtils} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.CommandExecutionMode @@ -141,7 +141,7 @@ case class CreateDataSourceTableAsSelectCommand( mode: SaveMode, query: LogicalPlan, outputColumnNames: Seq[String]) - extends LeafRunnableCommand { + extends LeafRunnableCommand with CTEInChildren { assert(query.resolved) override def innerChildren: Seq[LogicalPlan] = query :: Nil @@ -233,4 +233,8 @@ case class CreateDataSourceTableAsSelectCommand( throw ex } } + + override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { + copy(query = WithCTE(query, cteDefs)) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 351f6d5456d..88e940ffdc7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -735,7 +735,7 @@ case class DescribeTableCommand( * 7. Common table expressions (CTEs) */ case class DescribeQueryCommand(queryText: String, plan: LogicalPlan) - extends DescribeCommandBase { + extends DescribeCommandBase with CTEInChildren { override val output = DescribeCommandSchema.describeTableAttributes() @@ -747,6 +747,10 @@ case class DescribeQueryCommand(queryText: String, plan: LogicalPlan) describeSchema(queryExecution.analyzed.schema, result, header = false) result.toSeq } + + override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { + copy(plan = WithCTE(plan, cteDefs)) + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 3718794ea59..8ac982b9bdd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.{SQLConfHelper, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, GlobalTempView, LocalTempView, ViewType} import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, TemporaryViewRelation} import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, SubqueryExpression} -import org.apache.spark.sql.catalyst.plans.logical.{AnalysisOnlyCommand, LogicalPlan, Project, View} +import org.apache.spark.sql.catalyst.plans.logical.{AnalysisOnlyCommand, CTEInChildren, CTERelationDef, LogicalPlan, Project, View, WithCTE} import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper import org.apache.spark.sql.errors.QueryCompilationErrors @@ -69,7 +69,7 @@ case class CreateViewCommand( viewType: ViewType, isAnalyzed: Boolean = false, referredTempFunctions: Seq[String] = Seq.empty) - extends RunnableCommand with AnalysisOnlyCommand { + extends RunnableCommand with AnalysisOnlyCommand with CTEInChildren { import ViewHelper._ @@ -215,6 +215,10 @@ case class CreateViewCommand( comment = comment ) } + + override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { + copy(plan = WithCTE(plan, cteDefs)) + } } /** @@ -235,7 +239,7 @@ case class AlterViewAsCommand( query: LogicalPlan, isAnalyzed: Boolean = false, referredTempFunctions: Seq[String] = Seq.empty) - extends RunnableCommand with AnalysisOnlyCommand { + extends RunnableCommand with AnalysisOnlyCommand with CTEInChildren { import ViewHelper._ @@ -307,6 +311,10 @@ case class AlterViewAsCommand( session.sessionState.catalog.alterTable(updatedViewMeta) } + + override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { + copy(query = WithCTE(query, cteDefs)) + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala index 789b1d714fc..128f6acdeaa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, LogicalPlan, WithCTE} import org.apache.spark.sql.execution.command.LeafRunnableCommand import org.apache.spark.sql.sources.InsertableRelation @@ -31,7 +31,7 @@ case class InsertIntoDataSourceCommand( logicalRelation: LogicalRelation, query: LogicalPlan, overwrite: Boolean) - extends LeafRunnableCommand { + extends LeafRunnableCommand with CTEInChildren { override def innerChildren: Seq[QueryPlan[_]] = Seq(query) @@ -47,4 +47,8 @@ case class InsertIntoDataSourceCommand( Seq.empty[Row] } + + override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { + copy(query = WithCTE(query, cteDefs)) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala index 666ae9b5c6f..5423232db42 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala @@ -21,7 +21,7 @@ import scala.util.control.NonFatal import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, LogicalPlan, WithCTE} import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes import org.apache.spark.sql.execution.command.LeafRunnableCommand import org.apache.spark.sql.sources.CreatableRelationProvider @@ -39,7 +39,7 @@ case class SaveIntoDataSourceCommand( query: LogicalPlan, dataSource: CreatableRelationProvider, options: Map[String, String], - mode: SaveMode) extends LeafRunnableCommand { + mode: SaveMode) extends LeafRunnableCommand with CTEInChildren { override def innerChildren: Seq[QueryPlan[_]] = Seq(query) @@ -68,4 +68,8 @@ case class SaveIntoDataSourceCommand( override def clone(): LogicalPlan = { SaveIntoDataSourceCommand(query.clone(), dataSource, options, mode) } + + override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { + copy(query = WithCTE(query, cteDefs)) + } } diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/ansi/double-quoted-identifiers-enabled.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/ansi/double-quoted-identifiers-enabled.sql.out index 2a6bcce99d1..a155bccd091 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/ansi/double-quoted-identifiers-enabled.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/ansi/double-quoted-identifiers-enabled.sql.out @@ -410,11 +410,15 @@ CREATE TEMPORARY VIEW "myview"("c1") AS WITH "v"("a") AS (SELECT 1) SELECT "a" FROM "v" -- !query analysis CreateViewCommand `myview`, [(c1,None)], WITH "v"("a") AS (SELECT 1) SELECT "a" FROM "v", false, false, LocalTempView, true - +- Project [a#x] - +- SubqueryAlias v - +- Project [1#x AS a#x] - +- Project [1 AS 1#x] - +- OneRowRelation + +- WithCTE + :- CTERelationDef xxxx, false + : +- SubqueryAlias v + : +- Project [1#x AS a#x] + : +- Project [1 AS 1#x] + : +- OneRowRelation + +- Project [a#x] + +- SubqueryAlias v + +- CTERelationRef xxxx, true, [a#x] -- !query diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/cte-command.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/cte-command.sql.out new file mode 100644 index 00000000000..93dab5aaa6d --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/cte-command.sql.out @@ -0,0 +1,152 @@ +-- Automatically generated by SQLQueryTestSuite +-- !query +CREATE TABLE cte_tbl USING csv AS WITH s AS (SELECT 42 AS col) SELECT * FROM s +-- !query analysis +CreateDataSourceTableAsSelectCommand `spark_catalog`.`default`.`cte_tbl`, ErrorIfExists, [col] + +- WithCTE + :- CTERelationDef xxxx, false + : +- SubqueryAlias s + : +- Project [42 AS col#x] + : +- OneRowRelation + +- Project [col#x] + +- SubqueryAlias s + +- CTERelationRef xxxx, true, [col#x] + + +-- !query +SELECT * FROM cte_tbl +-- !query analysis +Project [col#x] ++- SubqueryAlias spark_catalog.default.cte_tbl + +- Relation spark_catalog.default.cte_tbl[col#x] csv + + +-- !query +CREATE TEMPORARY VIEW cte_view AS WITH s AS (SELECT 42 AS col) SELECT * FROM s +-- !query analysis +CreateViewCommand `cte_view`, WITH s AS (SELECT 42 AS col) SELECT * FROM s, false, false, LocalTempView, true + +- WithCTE + :- CTERelationDef xxxx, false + : +- SubqueryAlias s + : +- Project [42 AS col#x] + : +- OneRowRelation + +- Project [col#x] + +- SubqueryAlias s + +- CTERelationRef xxxx, true, [col#x] + + +-- !query +SELECT * FROM cte_view +-- !query analysis +Project [col#x] ++- SubqueryAlias cte_view + +- View (`cte_view`, [col#x]) + +- Project [cast(col#x as int) AS col#x] + +- WithCTE + :- CTERelationDef xxxx, false + : +- SubqueryAlias s + : +- Project [42 AS col#x] + : +- OneRowRelation + +- Project [col#x] + +- SubqueryAlias s + +- CTERelationRef xxxx, true, [col#x] + + +-- !query +WITH s AS (SELECT 43 AS col) +INSERT INTO cte_tbl SELECT * FROM S +-- !query analysis +InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/cte_tbl, false, CSV, [path=file:[not included in comparison]/{warehouse_dir}/cte_tbl], Append, `spark_catalog`.`default`.`cte_tbl`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/cte_tbl), [col] ++- WithCTE + :- CTERelationDef xxxx, false + : +- SubqueryAlias s + : +- Project [43 AS col#x] + : +- OneRowRelation + +- Project [col#x] + +- SubqueryAlias S + +- CTERelationRef xxxx, true, [col#x] + + +-- !query +SELECT * FROM cte_tbl +-- !query analysis +Project [col#x] ++- SubqueryAlias spark_catalog.default.cte_tbl + +- Relation spark_catalog.default.cte_tbl[col#x] csv + + +-- !query +INSERT INTO cte_tbl WITH s AS (SELECT 44 AS col) SELECT * FROM s +-- !query analysis +InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/cte_tbl, false, CSV, [path=file:[not included in comparison]/{warehouse_dir}/cte_tbl], Append, `spark_catalog`.`default`.`cte_tbl`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/cte_tbl), [col] ++- WithCTE + :- CTERelationDef xxxx, false + : +- SubqueryAlias s + : +- Project [44 AS col#x] + : +- OneRowRelation + +- Project [col#x] + +- SubqueryAlias s + +- CTERelationRef xxxx, true, [col#x] + + +-- !query +SELECT * FROM cte_tbl +-- !query analysis +Project [col#x] ++- SubqueryAlias spark_catalog.default.cte_tbl + +- Relation spark_catalog.default.cte_tbl[col#x] csv + + +-- !query +CREATE TABLE cte_tbl2 (col INT) USING csv +-- !query analysis +CreateDataSourceTableCommand `spark_catalog`.`default`.`cte_tbl2`, false + + +-- !query +WITH s AS (SELECT 45 AS col) +FROM s +INSERT INTO cte_tbl SELECT col +INSERT INTO cte_tbl2 SELECT col +-- !query analysis +Union false, false +:- InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/cte_tbl, false, CSV, [path=file:[not included in comparison]/{warehouse_dir}/cte_tbl], Append, `spark_catalog`.`default`.`cte_tbl`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/cte_tbl), [col] +: +- Project [col#x] +: +- SubqueryAlias s +: +- Project [45 AS col#x] +: +- OneRowRelation ++- InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/cte_tbl2, false, CSV, [path=file:[not included in comparison]/{warehouse_dir}/cte_tbl2], Append, `spark_catalog`.`default`.`cte_tbl2`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/cte_tbl2), [col] + +- Project [col#x] + +- SubqueryAlias s + +- Project [45 AS col#x] + +- OneRowRelation + + +-- !query +SELECT * FROM cte_tbl +-- !query analysis +Project [col#x] ++- SubqueryAlias spark_catalog.default.cte_tbl + +- Relation spark_catalog.default.cte_tbl[col#x] csv + + +-- !query +SELECT * FROM cte_tbl2 +-- !query analysis +Project [col#x] ++- SubqueryAlias spark_catalog.default.cte_tbl2 + +- Relation spark_catalog.default.cte_tbl2[col#x] csv + + +-- !query +DROP TABLE cte_tbl +-- !query analysis +DropTable false, false ++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.cte_tbl + + +-- !query +DROP TABLE cte_tbl2 +-- !query analysis +DropTable false, false ++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.cte_tbl2 diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/with.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/with.sql.out index e53480e96be..f58f8faa0be 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/with.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/with.sql.out @@ -452,10 +452,14 @@ with test as (select 42) insert into test select * from test -- !query analysis InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/test, false, Parquet, [path=file:[not included in comparison]/{warehouse_dir}/test], Append, `spark_catalog`.`default`.`test`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/test), [i] +- Project [cast(42#x as int) AS i#x] - +- Project [42#x] - +- SubqueryAlias test - +- Project [42 AS 42#x] - +- OneRowRelation + +- WithCTE + :- CTERelationDef xxxx, false + : +- SubqueryAlias test + : +- Project [42 AS 42#x] + : +- OneRowRelation + +- Project [42#x] + +- SubqueryAlias test + +- CTERelationRef xxxx, true, [42#x] -- !query diff --git a/sql/core/src/test/resources/sql-tests/inputs/cte-command.sql b/sql/core/src/test/resources/sql-tests/inputs/cte-command.sql new file mode 100644 index 00000000000..ee90c2de49e --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/cte-command.sql @@ -0,0 +1,33 @@ +-- WITH inside CTE +CREATE TABLE cte_tbl USING csv AS WITH s AS (SELECT 42 AS col) SELECT * FROM s; + +SELECT * FROM cte_tbl; + +-- WITH inside CREATE VIEW +CREATE TEMPORARY VIEW cte_view AS WITH s AS (SELECT 42 AS col) SELECT * FROM s; + +SELECT * FROM cte_view; + +-- INSERT inside WITH +WITH s AS (SELECT 43 AS col) +INSERT INTO cte_tbl SELECT * FROM S; + +SELECT * FROM cte_tbl; + +-- WITH inside INSERT +INSERT INTO cte_tbl WITH s AS (SELECT 44 AS col) SELECT * FROM s; + +SELECT * FROM cte_tbl; + +CREATE TABLE cte_tbl2 (col INT) USING csv; +-- Multi-INSERT +WITH s AS (SELECT 45 AS col) +FROM s +INSERT INTO cte_tbl SELECT col +INSERT INTO cte_tbl2 SELECT col; + +SELECT * FROM cte_tbl; +SELECT * FROM cte_tbl2; + +DROP TABLE cte_tbl; +DROP TABLE cte_tbl2; diff --git a/sql/core/src/test/resources/sql-tests/results/cte-command.sql.out b/sql/core/src/test/resources/sql-tests/results/cte-command.sql.out new file mode 100644 index 00000000000..67ac321a195 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/cte-command.sql.out @@ -0,0 +1,121 @@ +-- Automatically generated by SQLQueryTestSuite +-- !query +CREATE TABLE cte_tbl USING csv AS WITH s AS (SELECT 42 AS col) SELECT * FROM s +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT * FROM cte_tbl +-- !query schema +struct<col:int> +-- !query output +42 + + +-- !query +CREATE TEMPORARY VIEW cte_view AS WITH s AS (SELECT 42 AS col) SELECT * FROM s +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT * FROM cte_view +-- !query schema +struct<col:int> +-- !query output +42 + + +-- !query +WITH s AS (SELECT 43 AS col) +INSERT INTO cte_tbl SELECT * FROM S +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT * FROM cte_tbl +-- !query schema +struct<col:int> +-- !query output +42 +43 + + +-- !query +INSERT INTO cte_tbl WITH s AS (SELECT 44 AS col) SELECT * FROM s +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT * FROM cte_tbl +-- !query schema +struct<col:int> +-- !query output +42 +43 +44 + + +-- !query +CREATE TABLE cte_tbl2 (col INT) USING csv +-- !query schema +struct<> +-- !query output + + + +-- !query +WITH s AS (SELECT 45 AS col) +FROM s +INSERT INTO cte_tbl SELECT col +INSERT INTO cte_tbl2 SELECT col +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT * FROM cte_tbl +-- !query schema +struct<col:int> +-- !query output +42 +43 +44 +45 + + +-- !query +SELECT * FROM cte_tbl2 +-- !query schema +struct<col:int> +-- !query output +45 + + +-- !query +DROP TABLE cte_tbl +-- !query schema +struct<> +-- !query output + + + +-- !query +DROP TABLE cte_tbl2 +-- !query schema +struct<> +-- !query output + diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index 4127e7c75d7..eef2ae1f737 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -21,7 +21,7 @@ import scala.util.control.NonFatal import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, LogicalPlan, WithCTE} import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.command.{DataWritingCommand, LeafRunnableCommand} @@ -38,7 +38,7 @@ case class CreateHiveTableAsSelectCommand( query: LogicalPlan, outputColumnNames: Seq[String], mode: SaveMode) - extends LeafRunnableCommand { + extends LeafRunnableCommand with CTEInChildren { assert(query.resolved) override def innerChildren: Seq[LogicalPlan] = query :: Nil @@ -111,4 +111,8 @@ case class CreateHiveTableAsSelectCommand( s"[Database: ${tableDesc.database}, " + s"TableName: ${tableDesc.identifier.table}]" } + + override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { + copy(query = WithCTE(query, cteDefs)) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org