This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 7250941ab5b [SPARK-43529][SQL] Support general constant expressions as CREATE/REPLACE TABLE OPTIONS values 7250941ab5b is described below commit 7250941ab5b8ea1c1dc720f2b6407404ac7020bb Author: Daniel Tenedorio <daniel.tenedo...@databricks.com> AuthorDate: Sun Jun 11 21:31:03 2023 -0700 [SPARK-43529][SQL] Support general constant expressions as CREATE/REPLACE TABLE OPTIONS values ### What changes were proposed in this pull request? This PR updates the SQL compiler to support general constnat expressions in the syntax for CREATE/REPLACE TABLE OPTIONS values, rather than restricting to a few types of literals only. * The analyzer now checks that the provided expressions are in fact `foldable`, and throws an error message otherwise. * This error message that users encounter in these cases improves from a general "syntax error at or near <location>" to instead indicate that the syntax is valid, but only constant expressions are supported in these contexts. ### Why are the changes needed? This makes it easier to provide OPTIONS lists in SQL, supporting use cases like concatenating strings with `||`. ### Does this PR introduce _any_ user-facing change? Yes, the SQL syntax changes. ### How was this patch tested? This PR adds new unit test coverage. Closes #41191 from dtenedor/expression-properties. Authored-by: Daniel Tenedorio <daniel.tenedo...@databricks.com> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- core/src/main/resources/error/error-classes.json | 5 ++ .../spark/sql/catalyst/parser/SqlBaseParser.g4 | 10 ++- .../spark/sql/catalyst/analysis/Analyzer.scala | 1 + .../sql/catalyst/analysis/ResolveTableSpec.scala | 90 ++++++++++++++++++++ .../spark/sql/catalyst/parser/AstBuilder.scala | 69 ++++++++++----- .../sql/catalyst/plans/logical/v2Commands.scala | 70 +++++++++++++-- .../sql/catalyst/rules/RuleIdCollection.scala | 1 + .../apache/spark/sql/catalyst/trees/TreeNode.scala | 6 +- .../sql/connector/catalog/CatalogV2Util.scala | 5 +- .../spark/sql/errors/QueryCompilationErrors.scala | 17 ++++ .../CreateTablePartitioningValidationSuite.scala | 18 +--- .../spark/sql/catalyst/parser/DDLParserSuite.scala | 76 +++++++++-------- .../org/apache/spark/sql/DataFrameWriter.scala | 11 +-- .../org/apache/spark/sql/DataFrameWriterV2.scala | 8 +- .../catalyst/analysis/ResolveSessionCatalog.scala | 13 +-- .../spark/sql/execution/SparkSqlParser.scala | 19 ++++- .../datasources/v2/DataSourceV2Strategy.scala | 13 +-- .../apache/spark/sql/internal/CatalogImpl.scala | 12 ++- .../spark/sql/streaming/DataStreamWriter.scala | 5 +- .../sql/TableOptionsConstantFoldingSuite.scala | 99 ++++++++++++++++++++++ .../connector/V2CommandsCaseSensitivitySuite.scala | 14 ++- 21 files changed, 433 insertions(+), 129 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 7b39ab7266c..a12a8000870 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -1389,6 +1389,11 @@ "<statement> with multiple part function name(<funcName>) is not allowed." ] }, + "OPTION_IS_INVALID" : { + "message" : [ + "option or property key <key> is invalid; only <supported> are supported" + ] + }, "REPETITIVE_WINDOW_DEFINITION" : { "message" : [ "The definition of window <windowName> is repetitive." diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index 89100f2aeec..c7b238bfd2c 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -374,7 +374,7 @@ tableProvider ; createTableClauses - :((OPTIONS options=propertyList) | + :((OPTIONS options=expressionPropertyList) | (PARTITIONED BY partitioning=partitionFieldList) | skewSpec | bucketSpec | @@ -405,6 +405,14 @@ propertyValue | stringLit ; +expressionPropertyList + : LEFT_PAREN expressionProperty (COMMA expressionProperty)* RIGHT_PAREN + ; + +expressionProperty + : key=propertyKey (EQ? value=expression)? + ; + constantList : LEFT_PAREN constant (COMMA constant)* RIGHT_PAREN ; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index aa1b9d0e8fd..901ae243225 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -298,6 +298,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor ExtractGenerator :: ResolveGenerate :: ResolveFunctions :: + ResolveTableSpec :: ResolveAliases :: ResolveSubquery :: ResolveSubqueryColumnAliases :: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableSpec.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableSpec.scala new file mode 100644 index 00000000000..d4b0a8d25e0 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableSpec.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.SparkThrowable +import org.apache.spark.sql.catalyst.expressions.{Expression, Literal} +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.types.{ArrayType, MapType, StructType} + +/** + * This object is responsible for processing unresolved table specifications in commands with + * OPTIONS lists. The parser produces such lists as maps from strings to unresolved expressions. + * After otherwise resolving such expressions in the analyzer, here we convert them to resolved + * table specifications wherein these OPTIONS list values are represented as strings instead, for + * convenience. + */ +object ResolveTableSpec extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = { + plan.resolveOperatorsWithPruning(_.containsAnyPattern(COMMAND), ruleId) { + case t: CreateTable => + resolveTableSpec(t, t.tableSpec, t.optionsListExpressions, s => t.copy(tableSpec = s)) + case t: CreateTableAsSelect => + resolveTableSpec(t, t.tableSpec, t.optionsListExpressions, s => t.copy(tableSpec = s)) + case t: ReplaceTable => + resolveTableSpec(t, t.tableSpec, t.optionsListExpressions, s => t.copy(tableSpec = s)) + case t: ReplaceTableAsSelect => + resolveTableSpec(t, t.tableSpec, t.optionsListExpressions, s => t.copy(tableSpec = s)) + } + } + + /** Helper method to resolve the table specification within a logical plan. */ + private def resolveTableSpec( + input: LogicalPlan, tableSpec: TableSpec, optionsListExpressions: OptionsListExpressions, + withNewSpec: TableSpec => LogicalPlan): LogicalPlan = tableSpec match { + case u: UnresolvedTableSpec if optionsListExpressions.allOptionsResolved => + val newOptions: Seq[(String, String)] = optionsListExpressions.options.map { + case (key: String, null) => + (key, null) + case (key: String, value: Expression) => + val newValue: String = try { + val dt = value.dataType + value match { + case Literal(null, _) => + null + case _ + if dt.isInstanceOf[ArrayType] || + dt.isInstanceOf[StructType] || + dt.isInstanceOf[MapType] => + throw QueryCompilationErrors.optionMustBeConstant(key) + case _ => + val result = value.eval() + Literal(result, dt).toString + } + } catch { + case _: SparkThrowable | _: java.lang.RuntimeException => + throw QueryCompilationErrors.optionMustBeConstant(key) + } + (key, newValue) + } + val newTableSpec = ResolvedTableSpec( + properties = u.properties, + provider = u.provider, + options = newOptions.toMap, + location = u.location, + comment = u.comment, + serde = u.serde, + external = u.external) + withNewSpec(newTableSpec) + case _ => + input + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index f4170860c24..2156ae4d51f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -45,7 +45,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, con import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SupportsNamespaces, TableCatalog} import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, Expression => V2Expression, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform} -import org.apache.spark.sql.errors.QueryParsingErrors +import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryParsingErrors} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} @@ -3343,6 +3343,20 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit } } + /** + * Parse a key-value map from an [[ExpressionPropertyListContext]], assuming all values are + * specified. + */ + override def visitExpressionPropertyList( + ctx: ExpressionPropertyListContext): OptionsListExpressions = { + val options = ctx.expressionProperty.asScala.map { property => + val key: String = visitPropertyKey(property.key) + val value: Expression = Option(property.value).map(expression).getOrElse(null) + key -> value + }.toSeq + OptionsListExpressions(options) + } + override def visitStringLit(ctx: StringLitContext): Token = { if (ctx != null) { if (ctx.STRING_LITERAL != null) { @@ -3377,7 +3391,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit */ type TableClauses = ( Seq[Transform], Seq[StructField], Option[BucketSpec], Map[String, String], - Map[String, String], Option[String], Option[String], Option[SerdeInfo]) + OptionsListExpressions, Option[String], Option[String], Option[SerdeInfo]) /** * Validate a create table statement and return the [[TableIdentifier]]. @@ -3637,8 +3651,8 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit ctx.EXTENDED != null) } - def cleanTableProperties( - ctx: ParserRuleContext, properties: Map[String, String]): Map[String, String] = { + def cleanTableProperties[ValueType]( + ctx: ParserRuleContext, properties: Map[String, ValueType]): Map[String, ValueType] = { import TableCatalog._ val legacyOn = conf.getConf(SQLConf.LEGACY_PROPERTY_NON_RESERVED) properties.filter { @@ -3672,18 +3686,26 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit def cleanTableOptions( ctx: ParserRuleContext, - options: Map[String, String], - location: Option[String]): (Map[String, String], Option[String]) = { + options: OptionsListExpressions, + location: Option[String]): (OptionsListExpressions, Option[String]) = { var path = location - val filtered = cleanTableProperties(ctx, options).filter { - case (k, v) if k.equalsIgnoreCase("path") && path.nonEmpty => - throw QueryParsingErrors.duplicatedTablePathsFoundError(path.get, v, ctx) - case (k, v) if k.equalsIgnoreCase("path") => - path = Some(v) + val filtered = cleanTableProperties(ctx, options.options.toMap).filter { + case (key, value) if key.equalsIgnoreCase("path") => + val newValue: String = + if (value == null) { + "" + } else value match { + case Literal(_, _: StringType) => value.toString + case _ => throw QueryCompilationErrors.optionMustBeLiteralString(key) + } + if (path.nonEmpty) { + throw QueryParsingErrors.duplicatedTablePathsFoundError(path.get, newValue, ctx) + } + path = Some(newValue) false case _ => true } - (filtered, path) + (OptionsListExpressions(filtered.toSeq), path) } /** @@ -3841,7 +3863,8 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit val bucketSpec = ctx.bucketSpec().asScala.headOption.map(visitBucketSpec) val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty) val cleanedProperties = cleanTableProperties(ctx, properties) - val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty) + val options = Option(ctx.options).map(visitExpressionPropertyList) + .getOrElse(OptionsListExpressions(Seq.empty)) val location = visitLocationSpecList(ctx.locationSpec()) val (cleanedOptions, newLocation) = cleanTableOptions(ctx, options, location) val comment = visitCommentSpecList(ctx.commentSpec()) @@ -3921,8 +3944,8 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit val columns = Option(ctx.createOrReplaceTableColTypeList()) .map(visitCreateOrReplaceTableColTypeList).getOrElse(Nil) val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText) - val (partTransforms, partCols, bucketSpec, properties, options, location, comment, serdeInfo) = - visitCreateTableClauses(ctx.createTableClauses()) + val (partTransforms, partCols, bucketSpec, properties, options, location, + comment, serdeInfo) = visitCreateTableClauses(ctx.createTableClauses()) if (provider.isDefined && serdeInfo.isDefined) { operationNotAllowed(s"CREATE TABLE ... USING ... ${serdeInfo.get.describe}", ctx) @@ -3936,7 +3959,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit val partitioning = partitionExpressions(partTransforms, partCols, ctx) ++ bucketSpec.map(_.asTransform) - val tableSpec = TableSpec(properties, provider, options, location, comment, + val tableSpec = UnresolvedTableSpec(properties, provider, location, comment, serdeInfo, external) Option(ctx.query).map(plan) match { @@ -3953,14 +3976,15 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit case Some(query) => CreateTableAsSelect(withIdentClause(identifierContext, UnresolvedIdentifier(_)), - partitioning, query, tableSpec, Map.empty, ifNotExists) + partitioning, query, tableSpec, Map.empty, ifNotExists, optionsListExpressions = options) case _ => // Note: table schema includes both the table columns list and the partition columns // with data type. val schema = StructType(columns ++ partCols) CreateTable(withIdentClause(identifierContext, UnresolvedIdentifier(_)), - schema, partitioning, tableSpec, ignoreIfExists = ifNotExists) + schema, partitioning, tableSpec, ignoreIfExists = ifNotExists, + optionsListExpressions = options) } } @@ -4005,8 +4029,8 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit val partitioning = partitionExpressions(partTransforms, partCols, ctx) ++ bucketSpec.map(_.asTransform) - val tableSpec = TableSpec(properties, provider, options, location, comment, - serdeInfo, false) + val tableSpec = UnresolvedTableSpec(properties, provider, location, comment, + serdeInfo, external = false) Option(ctx.query).map(plan) match { case Some(_) if columns.nonEmpty => @@ -4023,7 +4047,8 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit case Some(query) => ReplaceTableAsSelect( withIdentClause(ctx.replaceTableHeader.identifierReference(), UnresolvedIdentifier(_)), - partitioning, query, tableSpec, writeOptions = Map.empty, orCreate = orCreate) + partitioning, query, tableSpec, writeOptions = Map.empty, orCreate = orCreate, + optionsListExpressions = options) case _ => // Note: table schema includes both the table columns list and the partition columns @@ -4031,7 +4056,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit val schema = StructType(columns ++ partCols) ReplaceTable( withIdentClause(ctx.replaceTableHeader.identifierReference(), UnresolvedIdentifier(_)), - schema, partitioning, tableSpec, orCreate = orCreate) + schema, partitioning, tableSpec, orCreate = orCreate, optionsListExpressions = options) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 91b69a3c089..a781bf56b9b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.expressions.filter.Predicate import org.apache.spark.sql.connector.write.{DeltaWrite, RowLevelOperation, RowLevelOperationTable, SupportsDelta, Write} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation -import org.apache.spark.sql.types.{BooleanType, DataType, IntegerType, MetadataBuilder, StringType, StructField, StructType} +import org.apache.spark.sql.types.{BooleanType, DataType, IntegerType, MapType, MetadataBuilder, StringType, StructField, StructType} // For v2 DML commands, it may end up with the v1 fallback code path and need to build a DataFrame // which is required by the DS v1 API. We need to keep the analyzed input query plan to build @@ -445,7 +445,9 @@ case class CreateTable( tableSchema: StructType, partitioning: Seq[Transform], tableSpec: TableSpec, - ignoreIfExists: Boolean) extends UnaryCommand with V2CreateTablePlan { + ignoreIfExists: Boolean, + optionsListExpressions: OptionsListExpressions = OptionsListExpressions(Seq.empty)) + extends UnaryCommand with V2CreateTablePlan { override def child: LogicalPlan = name @@ -467,7 +469,8 @@ case class CreateTableAsSelect( tableSpec: TableSpec, writeOptions: Map[String, String], ignoreIfExists: Boolean, - isAnalyzed: Boolean = false) + isAnalyzed: Boolean = false, + optionsListExpressions: OptionsListExpressions = OptionsListExpressions(Seq.empty)) extends V2CreateTableAsSelectPlan { override def markAsAnalyzed(ac: AnalysisContext): LogicalPlan = copy(isAnalyzed = true) @@ -496,7 +499,9 @@ case class ReplaceTable( tableSchema: StructType, partitioning: Seq[Transform], tableSpec: TableSpec, - orCreate: Boolean) extends UnaryCommand with V2CreateTablePlan { + orCreate: Boolean, + optionsListExpressions: OptionsListExpressions = OptionsListExpressions(Seq.empty)) + extends UnaryCommand with V2CreateTablePlan { override def child: LogicalPlan = name @@ -521,7 +526,8 @@ case class ReplaceTableAsSelect( tableSpec: TableSpec, writeOptions: Map[String, String], orCreate: Boolean, - isAnalyzed: Boolean = false) + isAnalyzed: Boolean = false, + optionsListExpressions: OptionsListExpressions = OptionsListExpressions(Seq.empty)) extends V2CreateTableAsSelectPlan { override def markAsAnalyzed(ac: AnalysisContext): LogicalPlan = copy(isAnalyzed = true) @@ -1382,11 +1388,61 @@ case class DropIndex( copy(table = newChild) } -case class TableSpec( +trait TableSpec { + def properties: Map[String, String] + def provider: Option[String] + def location: Option[String] + def comment: Option[String] + def serde: Option[SerdeInfo] + def external: Boolean + def withNewLocation(newLocation: Option[String]): TableSpec +} + +case class UnresolvedTableSpec( + properties: Map[String, String], + provider: Option[String], + location: Option[String], + comment: Option[String], + serde: Option[SerdeInfo], + external: Boolean) extends TableSpec { + override def withNewLocation(loc: Option[String]): TableSpec = { + UnresolvedTableSpec(properties, provider, loc, comment, serde, external) + } +} + +/** + * This contains the expressions in an OPTIONS list. We store it alongside anywhere the above + * UnresolvedTableSpec lives. We use a separate object so that tree traversals in analyzer rules can + * descend into the child expressions naturally without extra treatment. + */ +case class OptionsListExpressions(options: Seq[(String, Expression)]) + extends Expression with Unevaluable { + override def nullable: Boolean = true + override def dataType: DataType = MapType(StringType, StringType) + override def children: Seq[Expression] = options.map(_._2) + + override protected def withNewChildrenInternal( + newChildren: IndexedSeq[Expression]): Expression = { + assert(options.length == newChildren.length) + val newOptions = options.zip(newChildren).map { + case ((key: String, _), newChild: Expression) => + (key, newChild) + } + OptionsListExpressions(newOptions) + } + + lazy val allOptionsResolved: Boolean = options.map(_._2).forall(_.resolved) +} + +case class ResolvedTableSpec( properties: Map[String, String], provider: Option[String], options: Map[String, String], location: Option[String], comment: Option[String], serde: Option[SerdeInfo], - external: Boolean) + external: Boolean) extends TableSpec { + override def withNewLocation(newLocation: Option[String]): TableSpec = { + ResolvedTableSpec(properties, provider, options, newLocation, comment, serde, external) + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala index 7fa048c5dc3..caf679f3e7a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala @@ -92,6 +92,7 @@ object RuleIdCollection { "org.apache.spark.sql.catalyst.analysis.ResolveLateralColumnAliasReference" :: "org.apache.spark.sql.catalyst.analysis.ResolveOrderByAll" :: "org.apache.spark.sql.catalyst.analysis.ResolveRowLevelCommandAssignments" :: + "org.apache.spark.sql.catalyst.analysis.ResolveTableSpec" :: "org.apache.spark.sql.catalyst.analysis.ResolveTimeZone" :: "org.apache.spark.sql.catalyst.analysis.ResolveUnion" :: "org.apache.spark.sql.catalyst.analysis.ResolveWindowTime" :: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 75802de1a66..17c89d7e6ae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.ScalaReflection._ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, FunctionResource} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.JoinType -import org.apache.spark.sql.catalyst.plans.logical.TableSpec +import org.apache.spark.sql.catalyst.plans.logical.{ResolvedTableSpec, UnresolvedTableSpec} import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} import org.apache.spark.sql.catalyst.rules.RuleId import org.apache.spark.sql.catalyst.rules.RuleIdCollection @@ -927,9 +927,11 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product with Tre redactMapString(map.asCaseSensitiveMap().asScala, maxFields) case map: Map[_, _] => redactMapString(map, maxFields) - case t: TableSpec => + case t: ResolvedTableSpec => t.copy(properties = Utils.redact(t.properties).toMap, options = Utils.redact(t.options).toMap) :: Nil + case t: UnresolvedTableSpec => + t.copy(properties = Utils.redact(t.properties).toMap) :: Nil case table: CatalogTable => stringArgsForCatalogTable(table) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index e5d9720bb02..e92c1ee75a6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -25,7 +25,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec} import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.catalyst.plans.logical.{SerdeInfo, TableSpec} +import org.apache.spark.sql.catalyst.plans.logical.{ResolvedTableSpec, SerdeInfo, TableSpec} import org.apache.spark.sql.catalyst.util.GeneratedColumn import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ import org.apache.spark.sql.connector.catalog.TableChange._ @@ -376,7 +376,8 @@ private[sql] object CatalogV2Util { def convertTableProperties(t: TableSpec): Map[String, String] = { val props = convertTableProperties( - t.properties, t.options, t.serde, t.location, t.comment, t.provider, t.external) + t.properties, t.asInstanceOf[ResolvedTableSpec].options, t.serde, t.location, t.comment, + t.provider, t.external) withDefaultOwnership(props) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 94b8ee25dd2..4c87b9da1c7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -3567,4 +3567,21 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { ) ) } + + def optionMustBeLiteralString(key: String): Throwable = { + new AnalysisException( + errorClass = "INVALID_SQL_SYNTAX.OPTION_IS_INVALID", + messageParameters = Map( + "key" -> key, + "supported" -> "literal strings")) + } + + def optionMustBeConstant(key: String, cause: Option[Throwable] = None): Throwable = { + new AnalysisException( + errorClass = "INVALID_SQL_SYNTAX.OPTION_IS_INVALID", + messageParameters = Map( + "key" -> key, + "supported" -> "constant expressions"), + cause = cause) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala index ba312ddbc49..d1651e536dd 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala @@ -20,17 +20,15 @@ package org.apache.spark.sql.catalyst.analysis import java.util import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LeafNode, TableSpec} +import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LeafNode, UnresolvedTableSpec} import org.apache.spark.sql.connector.catalog.{InMemoryTableCatalog, Table, TableCapability, TableCatalog} import org.apache.spark.sql.connector.expressions.Expressions import org.apache.spark.sql.types.{DoubleType, LongType, StringType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap class CreateTablePartitioningValidationSuite extends AnalysisTest { - + val tableSpec = UnresolvedTableSpec(Map.empty, None, None, None, None, false) test("CreateTableAsSelect: fail missing top-level column") { - val tableSpec = TableSpec(Map.empty, None, Map.empty, - None, None, None, false) val plan = CreateTableAsSelect( UnresolvedIdentifier(Array("table_name")), Expressions.bucket(4, "does_not_exist") :: Nil, @@ -46,8 +44,6 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest { } test("CreateTableAsSelect: fail missing top-level column nested reference") { - val tableSpec = TableSpec(Map.empty, None, Map.empty, - None, None, None, false) val plan = CreateTableAsSelect( UnresolvedIdentifier(Array("table_name")), Expressions.bucket(4, "does_not_exist.z") :: Nil, @@ -63,8 +59,6 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest { } test("CreateTableAsSelect: fail missing nested column") { - val tableSpec = TableSpec(Map.empty, None, Map.empty, - None, None, None, false) val plan = CreateTableAsSelect( UnresolvedIdentifier(Array("table_name")), Expressions.bucket(4, "point.z") :: Nil, @@ -80,8 +74,6 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest { } test("CreateTableAsSelect: fail with multiple errors") { - val tableSpec = TableSpec(Map.empty, None, Map.empty, - None, None, None, false) val plan = CreateTableAsSelect( UnresolvedIdentifier(Array("table_name")), Expressions.bucket(4, "does_not_exist", "point.z") :: Nil, @@ -97,8 +89,6 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest { } test("CreateTableAsSelect: success with top-level column") { - val tableSpec = TableSpec(Map.empty, None, Map.empty, - None, None, None, false) val plan = CreateTableAsSelect( UnresolvedIdentifier(Array("table_name")), Expressions.bucket(4, "id") :: Nil, @@ -111,8 +101,6 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest { } test("CreateTableAsSelect: success using nested column") { - val tableSpec = TableSpec(Map.empty, None, Map.empty, - None, None, None, false) val plan = CreateTableAsSelect( UnresolvedIdentifier(Array("table_name")), Expressions.bucket(4, "point.x") :: Nil, @@ -125,8 +113,6 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest { } test("CreateTableAsSelect: success using complex column") { - val tableSpec = TableSpec(Map.empty, None, Map.empty, - None, None, None, false) val plan = CreateTableAsSelect( UnresolvedIdentifier(Array("table_name")), Expressions.bucket(4, "point") :: Nil, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 53635acf0b3..8cfdf411ae9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -22,13 +22,13 @@ import java.util.Locale import org.apache.spark.SparkThrowable import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions.{EqualTo, Hex, Literal} -import org.apache.spark.sql.catalyst.plans.logical.{TableSpec => LogicalTableSpec, _} +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.{GeneratedColumn, ResolveDefaultColumns} import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition.{after, first} import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform} import org.apache.spark.sql.connector.expressions.LogicalExpressions.bucket import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{IntegerType, LongType, MetadataBuilder, StringType, StructType, TimestampType} +import org.apache.spark.sql.types.{Decimal, IntegerType, LongType, MetadataBuilder, StringType, StructType, TimestampType} import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} class DDLParserSuite extends AnalysisTest { @@ -57,7 +57,7 @@ class DDLParserSuite extends AnalysisTest { Seq.empty[Transform], Map.empty[String, String], Some("parquet"), - Map.empty[String, String], + OptionsListExpressions(Seq.empty), None, None, None) @@ -83,7 +83,7 @@ class DDLParserSuite extends AnalysisTest { Seq.empty[Transform], Map.empty[String, String], Some("parquet"), - Map.empty[String, String], + OptionsListExpressions(Seq.empty), None, None, None), @@ -103,7 +103,7 @@ class DDLParserSuite extends AnalysisTest { Seq(IdentityTransform(FieldReference("a"))), Map.empty[String, String], Some("parquet"), - Map.empty[String, String], + OptionsListExpressions(Seq.empty), None, None, None) @@ -157,7 +157,7 @@ class DDLParserSuite extends AnalysisTest { LiteralValue(34, IntegerType)))), Map.empty[String, String], Some("parquet"), - Map.empty[String, String], + OptionsListExpressions(Seq.empty), None, None, None) @@ -179,7 +179,7 @@ class DDLParserSuite extends AnalysisTest { List(bucket(5, Array(FieldReference.column("a")), Array(FieldReference.column("b")))), Map.empty[String, String], Some("parquet"), - Map.empty[String, String], + OptionsListExpressions(Seq.empty), None, None, None) @@ -198,7 +198,7 @@ class DDLParserSuite extends AnalysisTest { Seq.empty[Transform], Map.empty[String, String], Some("parquet"), - Map.empty[String, String], + OptionsListExpressions(Seq.empty), None, Some("abc"), None) @@ -218,7 +218,7 @@ class DDLParserSuite extends AnalysisTest { Seq.empty[Transform], Map("test" -> "test"), Some("parquet"), - Map.empty[String, String], + OptionsListExpressions(Seq.empty), None, None, None) @@ -236,7 +236,7 @@ class DDLParserSuite extends AnalysisTest { Seq.empty[Transform], Map.empty[String, String], Some("parquet"), - Map.empty[String, String], + OptionsListExpressions(Seq.empty), Some("/tmp/file"), None, None) @@ -254,7 +254,7 @@ class DDLParserSuite extends AnalysisTest { Seq.empty[Transform], Map.empty[String, String], Some("parquet"), - Map.empty[String, String], + OptionsListExpressions(Seq.empty), None, None, None) @@ -272,7 +272,7 @@ class DDLParserSuite extends AnalysisTest { Seq(IdentityTransform(FieldReference("part"))), Map.empty[String, String], None, - Map.empty[String, String], + OptionsListExpressions(Seq.empty), None, None, None) @@ -290,7 +290,7 @@ class DDLParserSuite extends AnalysisTest { Seq(IdentityTransform(FieldReference("part"))), Map.empty[String, String], None, - Map.empty[String, String], + OptionsListExpressions(Seq.empty), None, None, None) @@ -308,7 +308,7 @@ class DDLParserSuite extends AnalysisTest { Seq(IdentityTransform(FieldReference("part"))), Map.empty[String, String], Some("parquet"), - Map.empty[String, String], + OptionsListExpressions(Seq.empty), None, None, None) @@ -381,7 +381,7 @@ class DDLParserSuite extends AnalysisTest { Seq(IdentityTransform(FieldReference("part"))), Map.empty[String, String], None, - Map.empty[String, String], + OptionsListExpressions(Seq.empty), None, None, Some(SerdeInfo(storedAs = Some("parquet")))) @@ -406,7 +406,7 @@ class DDLParserSuite extends AnalysisTest { Seq(IdentityTransform(FieldReference("part"))), Map.empty[String, String], None, - Map.empty[String, String], + OptionsListExpressions(Seq.empty), None, None, Some(SerdeInfo(storedAs = Some(format), serde = Some("customSerde"), serdeProperties = Map( @@ -463,7 +463,7 @@ class DDLParserSuite extends AnalysisTest { Seq(IdentityTransform(FieldReference("part"))), Map.empty[String, String], None, - Map.empty[String, String], + OptionsListExpressions(Seq.empty), None, None, Some(SerdeInfo(storedAs = Some("textfile"), serdeProperties = Map( @@ -514,7 +514,7 @@ class DDLParserSuite extends AnalysisTest { Seq(IdentityTransform(FieldReference("part"))), Map.empty[String, String], None, - Map.empty[String, String], + OptionsListExpressions(Seq.empty), None, None, Some(SerdeInfo(formatClasses = Some(FormatClasses("inFormat", "outFormat"))))) @@ -537,7 +537,7 @@ class DDLParserSuite extends AnalysisTest { Seq(IdentityTransform(FieldReference("part"))), Map.empty[String, String], None, - Map.empty[String, String], + OptionsListExpressions(Seq.empty), None, None, Some(SerdeInfo( @@ -878,9 +878,13 @@ class DDLParserSuite extends AnalysisTest { Seq("table_name"), Some(new StructType), Seq.empty[Transform], - Map.empty[String, String], + Map.empty, Some("json"), - Map("a" -> "1", "b" -> "0.1", "c" -> "true"), + OptionsListExpressions( + Seq( + ("a", Literal(1)), + ("b", Literal(Decimal(0.1))), + ("c" -> Literal(true)))), None, None, None), @@ -935,7 +939,7 @@ class DDLParserSuite extends AnalysisTest { Seq.empty[Transform], Map("p1" -> "v1", "p2" -> "v2"), Some("parquet"), - Map.empty[String, String], + OptionsListExpressions(Seq.empty), Some("/user/external/page_view"), Some("This is the staging page view table"), None) @@ -2472,7 +2476,7 @@ class DDLParserSuite extends AnalysisTest { partitioning: Seq[Transform], properties: Map[String, String], provider: Option[String], - options: Map[String, String], + options: OptionsListExpressions, location: Option[String], comment: Option[String], serdeInfo: Option[SerdeInfo], @@ -2488,7 +2492,7 @@ class DDLParserSuite extends AnalysisTest { create.partitioning, create.tableSpec.properties, create.tableSpec.provider, - create.tableSpec.options, + create.optionsListExpressions, create.tableSpec.location, create.tableSpec.comment, create.tableSpec.serde, @@ -2500,7 +2504,7 @@ class DDLParserSuite extends AnalysisTest { replace.partitioning, replace.tableSpec.properties, replace.tableSpec.provider, - replace.tableSpec.options, + replace.optionsListExpressions, replace.tableSpec.location, replace.tableSpec.comment, replace.tableSpec.serde) @@ -2511,7 +2515,7 @@ class DDLParserSuite extends AnalysisTest { ctas.partitioning, ctas.tableSpec.properties, ctas.tableSpec.provider, - ctas.tableSpec.options, + ctas.optionsListExpressions, ctas.tableSpec.location, ctas.tableSpec.comment, ctas.tableSpec.serde, @@ -2523,7 +2527,7 @@ class DDLParserSuite extends AnalysisTest { rtas.partitioning, rtas.tableSpec.properties, rtas.tableSpec.provider, - rtas.tableSpec.options, + rtas.optionsListExpressions, rtas.tableSpec.location, rtas.tableSpec.comment, rtas.tableSpec.serde) @@ -2560,7 +2564,7 @@ class DDLParserSuite extends AnalysisTest { Seq.empty[Transform], Map.empty[String, String], None, - Map.empty[String, String], + OptionsListExpressions(Seq.empty), None, None, None) @@ -2610,8 +2614,8 @@ class DDLParserSuite extends AnalysisTest { .putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, "\"abc\"").build()) val createTableResult = CreateTable(UnresolvedIdentifier(Seq("my_tab")), schemaWithDefaultColumn, - Seq.empty[Transform], LogicalTableSpec(Map.empty[String, String], Some("parquet"), - Map.empty[String, String], None, None, None, false), false) + Seq.empty[Transform], UnresolvedTableSpec(Map.empty[String, String], Some("parquet"), + None, None, None, false), false) // Parse the CREATE TABLE statement twice, swapping the order of the NOT NULL and DEFAULT // options, to make sure that the parser accepts any ordering of these options. comparePlans(parsePlan( @@ -2623,8 +2627,8 @@ class DDLParserSuite extends AnalysisTest { comparePlans(parsePlan("REPLACE TABLE my_tab(a INT, " + "b STRING NOT NULL DEFAULT \"abc\") USING parquet"), ReplaceTable(UnresolvedIdentifier(Seq("my_tab")), schemaWithDefaultColumn, - Seq.empty[Transform], LogicalTableSpec(Map.empty[String, String], Some("parquet"), - Map.empty[String, String], None, None, None, false), false)) + Seq.empty[Transform], UnresolvedTableSpec(Map.empty[String, String], Some("parquet"), + None, None, None, false), false)) // These ALTER TABLE statements should parse successfully. comparePlans( parsePlan("ALTER TABLE t1 ADD COLUMN x int NOT NULL DEFAULT 42"), @@ -2779,13 +2783,13 @@ class DDLParserSuite extends AnalysisTest { comparePlans(parsePlan( "CREATE TABLE my_tab(a INT, b INT NOT NULL GENERATED ALWAYS AS (a+1)) USING parquet"), CreateTable(UnresolvedIdentifier(Seq("my_tab")), schemaWithGeneratedColumn, - Seq.empty[Transform], LogicalTableSpec(Map.empty[String, String], Some("parquet"), - Map.empty[String, String], None, None, None, false), false)) + Seq.empty[Transform], UnresolvedTableSpec(Map.empty[String, String], Some("parquet"), + None, None, None, false), false)) comparePlans(parsePlan( "REPLACE TABLE my_tab(a INT, b INT NOT NULL GENERATED ALWAYS AS (a+1)) USING parquet"), ReplaceTable(UnresolvedIdentifier(Seq("my_tab")), schemaWithGeneratedColumn, - Seq.empty[Transform], LogicalTableSpec(Map.empty[String, String], Some("parquet"), - Map.empty[String, String], None, None, None, false), false)) + Seq.empty[Transform], UnresolvedTableSpec(Map.empty[String, String], Some("parquet"), + None, None, None, false), false)) // Two generation expressions checkError( exception = parseException("CREATE TABLE my_tab(a INT, " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index da93fdf58e9..a3cb12307fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedIdentifier, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, InsertIntoStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, TableSpec} +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, InsertIntoStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, UnresolvedTableSpec} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.connector.catalog.{CatalogPlugin, CatalogV2Implicits, CatalogV2Util, Identifier, SupportsCatalogOptions, Table, TableCatalog, TableProvider, V1Table} import org.apache.spark.sql.connector.catalog.TableCapability._ @@ -326,10 +326,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val catalog = CatalogV2Util.getTableProviderCatalog( supportsExtract, catalogManager, dsOptions) - val tableSpec = TableSpec( + val tableSpec = UnresolvedTableSpec( properties = Map.empty, provider = Some(source), - options = Map.empty, location = extraOptions.get("path"), comment = extraOptions.get(TableCatalog.PROP_COMMENT), serde = None, @@ -591,10 +590,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { AppendData.byName(v2Relation, df.logicalPlan, extraOptions.toMap) case (SaveMode.Overwrite, _) => - val tableSpec = TableSpec( + val tableSpec = UnresolvedTableSpec( properties = Map.empty, provider = Some(source), - options = Map.empty, location = extraOptions.get("path"), comment = extraOptions.get(TableCatalog.PROP_COMMENT), serde = None, @@ -611,10 +609,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { // We have a potential race condition here in AppendMode, if the table suddenly gets // created between our existence check and physical execution, but this can't be helped // in any case. - val tableSpec = TableSpec( + val tableSpec = UnresolvedTableSpec( properties = Map.empty, provider = Some(source), - options = Map.empty, location = extraOptions.get("path"), comment = extraOptions.get(TableCatalog.PROP_COMMENT), serde = None, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala index 270bf0a948e..101dd7ec299 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala @@ -23,7 +23,7 @@ import scala.collection.mutable import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException, UnresolvedIdentifier, UnresolvedRelation} import org.apache.spark.sql.catalyst.expressions.{Attribute, Bucket, Days, Hours, Literal, Months, Years} -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, TableSpec} +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, UnresolvedTableSpec} import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference, Transform} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.types.IntegerType @@ -107,10 +107,9 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T]) } override def create(): Unit = { - val tableSpec = TableSpec( + val tableSpec = UnresolvedTableSpec( properties = properties.toMap, provider = provider, - options = Map.empty, location = None, comment = None, serde = None, @@ -196,10 +195,9 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T]) } private def internalReplace(orCreate: Boolean): Unit = { - val tableSpec = TableSpec( + val tableSpec = UnresolvedTableSpec( properties = properties.toMap, provider = provider, - options = Map.empty, location = None, comment = None, serde = None, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index b2b35b40492..515b0bb90bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -158,9 +158,9 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) // For CREATE TABLE [AS SELECT], we should use the v1 command if the catalog is resolved to the // session catalog and the table provider is not v2. - case c @ CreateTable(ResolvedV1Identifier(ident), _, _, _, _) => + case c @ CreateTable(ResolvedV1Identifier(ident), _, _, tableSpec: ResolvedTableSpec, _, _) => val (storageFormat, provider) = getStorageFormatAndProvider( - c.tableSpec.provider, c.tableSpec.options, c.tableSpec.location, c.tableSpec.serde, + c.tableSpec.provider, tableSpec.options, c.tableSpec.location, c.tableSpec.serde, ctas = false) if (!isV2Provider(provider)) { constructV1TableCmd(None, c.tableSpec, ident, c.tableSchema, c.partitioning, @@ -169,10 +169,11 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) c } - case c @ CreateTableAsSelect(ResolvedV1Identifier(ident), _, _, _, writeOptions, _, _) => + case c @ CreateTableAsSelect( + ResolvedV1Identifier(ident), _, _, tableSpec: ResolvedTableSpec, writeOptions, _, _, _) => val (storageFormat, provider) = getStorageFormatAndProvider( c.tableSpec.provider, - c.tableSpec.options ++ writeOptions, + tableSpec.options ++ writeOptions, c.tableSpec.location, c.tableSpec.serde, ctas = true) @@ -192,7 +193,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) // For REPLACE TABLE [AS SELECT], we should fail if the catalog is resolved to the // session catalog and the table provider is not v2. - case c @ ReplaceTable(ResolvedV1Identifier(ident), _, _, _, _) => + case c @ ReplaceTable(ResolvedV1Identifier(ident), _, _, _, _, _) => val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName) if (!isV2Provider(provider)) { throw QueryCompilationErrors.unsupportedTableOperationError( @@ -201,7 +202,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) c } - case c @ ReplaceTableAsSelect(ResolvedV1Identifier(ident), _, _, _, _, _, _) => + case c @ ReplaceTableAsSelect(ResolvedV1Identifier(ident), _, _, _, _, _, _, _) => val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName) if (!isV2Provider(provider)) { throw QueryCompilationErrors.unsupportedTableOperationError( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index e3ae1b83a16..dfe3c67e18b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -29,17 +29,18 @@ import org.antlr.v4.runtime.tree.TerminalNode import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, PersistedView, UnresolvedFunctionName, UnresolvedIdentifier} import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.{Expression, Literal} import org.apache.spark.sql.catalyst.parser._ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.trees.TreePattern.PARAMETER import org.apache.spark.sql.catalyst.util.DateTimeConstants -import org.apache.spark.sql.errors.QueryParsingErrors +import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryParsingErrors} import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution} import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION +import org.apache.spark.sql.types.StringType /** * Concrete parser for Spark SQL statements. @@ -332,7 +333,19 @@ class SparkSqlAstBuilder extends AstBuilder { withIdentClause(identCtx, ident => { val table = tableIdentifier(ident, "CREATE TEMPORARY VIEW", ctx) - val optionsWithLocation = location.map(l => options + ("path" -> l)).getOrElse(options) + val optionsList: Map[String, String] = + options.options.map { case (key, value) => + val newValue: String = + if (value == null) { + null + } else value match { + case Literal(_, _: StringType) => value.toString + case _ => throw QueryCompilationErrors.optionMustBeLiteralString(key) + } + (key, newValue) + }.toMap + val optionsWithLocation = + location.map(l => optionsList + ("path" -> l)).getOrElse(optionsList) CreateTempViewUsing(table, schema, replace = false, global = false, provider, optionsWithLocation) }) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 471a9393b7d..07cce4b931b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -103,8 +103,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat } private def qualifyLocInTableSpec(tableSpec: TableSpec): TableSpec = { - tableSpec.copy( - location = tableSpec.location.map(makeQualifiedDBObjectPath(_))) + tableSpec.withNewLocation(tableSpec.location.map(makeQualifiedDBObjectPath(_))) } override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { @@ -173,7 +172,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat WriteToDataSourceV2Exec(writer, invalidateCacheFunc, planLater(query), customMetrics) :: Nil case CreateTable(ResolvedIdentifier(catalog, ident), schema, partitioning, - tableSpec, ifNotExists) => + tableSpec, ifNotExists, unresolvedOptionsList) => ResolveDefaultColumns.validateCatalogForDefaultValue(schema, catalog.asTableCatalog, ident) val newSchema: StructType = ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults( @@ -185,7 +184,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat partitioning, qualifyLocInTableSpec(tableSpec), ifNotExists) :: Nil case CreateTableAsSelect(ResolvedIdentifier(catalog, ident), parts, query, tableSpec, - options, ifNotExists, true) => + options, ifNotExists, true, unresolvedOptionsList) => catalog match { case staging: StagingTableCatalog => AtomicCreateTableAsSelectExec(staging, ident, parts, query, @@ -198,7 +197,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat case RefreshTable(r: ResolvedTable) => RefreshTableExec(r.catalog, r.identifier, recacheTable(r)) :: Nil - case ReplaceTable(ResolvedIdentifier(catalog, ident), schema, parts, tableSpec, orCreate) => + case ReplaceTable( + ResolvedIdentifier(catalog, ident), schema, parts, tableSpec, orCreate, + unresolvedOptionsList) => ResolveDefaultColumns.validateCatalogForDefaultValue(schema, catalog.asTableCatalog, ident) val newSchema: StructType = ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults( @@ -217,7 +218,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat } case ReplaceTableAsSelect(ResolvedIdentifier(catalog, ident), - parts, query, tableSpec, options, orCreate, true) => + parts, query, tableSpec, options, orCreate, true, unresolvedOptionsList) => catalog match { case staging: StagingTableCatalog => AtomicReplaceTableAsSelectExec( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 55136442b1a..2aac82a990e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -26,7 +26,8 @@ import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, FunctionIdenti import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LocalRelation, LogicalPlan, RecoverPartitions, ShowFunctions, ShowNamespaces, ShowTables, TableSpec, View} +import org.apache.spark.sql.catalyst.expressions.{Expression, Literal} +import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LocalRelation, LogicalPlan, OptionsListExpressions, RecoverPartitions, ShowFunctions, ShowNamespaces, ShowTables, UnresolvedTableSpec, View} import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, FunctionCatalog, Identifier, SupportsNamespaces, Table => V2Table, TableCatalog, V1Table} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{CatalogHelper, MultipartIdentifierHelper, NamespaceHelper, TransformHelper} import org.apache.spark.sql.errors.QueryCompilationErrors @@ -661,10 +662,12 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { None } - val tableSpec = TableSpec( + val newOptions = OptionsListExpressions(options.map { case (key, value) => + (key, Literal(value).asInstanceOf[Expression]) + }.toSeq) + val tableSpec = UnresolvedTableSpec( properties = Map(), provider = Some(source), - options = options, location = location, comment = { if (description.isEmpty) None else Some(description) }, serde = None, @@ -675,7 +678,8 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { tableSchema = schema, partitioning = Seq(), tableSpec = tableSpec, - ignoreIfExists = false) + ignoreIfExists = false, + optionsListExpressions = newOptions) sparkSession.sessionState.executePlan(plan).toRdd sparkSession.table(tableName) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index 2e8d671ad70..f913faa030d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.UnresolvedIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, TableSpec} +import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, UnresolvedTableSpec} import org.apache.spark.sql.catalyst.streaming.InternalOutputModes import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.connector.catalog.{Identifier, SupportsWrite, Table, TableCatalog, TableProvider, V1Table, V2TableWithV1Fallback} @@ -290,10 +290,9 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { * Note, currently the new table creation by this API doesn't fully cover the V2 table. * TODO (SPARK-33638): Full support of v2 table creation */ - val tableSpec = TableSpec( + val tableSpec = UnresolvedTableSpec( Map.empty[String, String], Some(source), - Map.empty[String, String], extraOptions.get("path"), None, None, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TableOptionsConstantFoldingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/TableOptionsConstantFoldingSuite.scala new file mode 100644 index 00000000000..2fab2d0f1af --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/TableOptionsConstantFoldingSuite.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.execution.FileSourceScanExec +import org.apache.spark.sql.test.SharedSparkSession + +/** These tests exercise passing constant but non-literal OPTIONS lists, and folding them. */ +class TableOptionsConstantFoldingSuite extends QueryTest with SharedSparkSession { + val prefix = "create table t (col int) using json options " + + /** Helper method to create a table with a OPTIONS list and then check the resulting value. */ + def checkOption(createOption: String, expectedValue: String): Unit = { + withTable("t") { + sql(s"$prefix ('k' = $createOption)") + sql("insert into t values (42)") + checkAnswer(spark.table("t"), Seq(Row(42))) + val actual = spark.table("t") + .queryExecution.sparkPlan.asInstanceOf[FileSourceScanExec].relation.options + assert(actual.get("k").get == expectedValue) + } + } + + test("SPARK-43529: Support constant expressions in CREATE/REPLACE TABLE OPTIONS") { + checkOption("1 + 2", "3") + checkOption("'a' || 'b'", "ab") + checkOption("true or false", "true") + checkOption("null", null) + checkOption("cast('11 23:4:0' as interval day to second)", + "INTERVAL '11 23:04:00' DAY TO SECOND") + checkOption("date_diff(current_date(), current_date())", "0") + checkOption("date_sub(date'2022-02-02', 1)", "2022-02-01") + checkOption("timestampadd(microsecond, 5, timestamp'2022-02-28 00:00:00')", + "2022-02-28 00:00:00.000005") + checkOption("round(cast(2.25 as decimal(5, 3)), 1)", "2.3") + // The result of invoking this "ROUND" function call is NULL, since the target decimal type is + // too narrow to contain the result of the cast. + checkOption("round(cast(2.25 as decimal(3, 3)), 1)", "null") + + // Test some cases where the provided option value is a non-constant or invalid expression. + checkError( + exception = intercept[AnalysisException]( + sql(s"$prefix ('k' = 1 + 2 + unresolvedAttribute)")), + errorClass = "UNRESOLVED_COLUMN.WITHOUT_SUGGESTION", + parameters = Map( + "objectName" -> "`unresolvedAttribute`"), + queryContext = Array(ExpectedContext("", "", 60, 78, "unresolvedAttribute"))) + checkError( + exception = intercept[AnalysisException]( + sql(s"$prefix ('k' = true or false or unresolvedAttribute)")), + errorClass = "UNRESOLVED_COLUMN.WITHOUT_SUGGESTION", + parameters = Map( + "objectName" -> "`unresolvedAttribute`"), + queryContext = Array(ExpectedContext("", "", 69, 87, "unresolvedAttribute"))) + checkError( + exception = intercept[AnalysisException]( + sql(s"$prefix ('k' = cast(array('9', '9') as array<byte>))")), + errorClass = "INVALID_SQL_SYNTAX.OPTION_IS_INVALID", + parameters = Map( + "key" -> "k", + "supported" -> "constant expressions")) + checkError( + exception = intercept[AnalysisException]( + sql(s"$prefix ('k' = cast(map('9', '9') as map<string, string>))")), + errorClass = "INVALID_SQL_SYNTAX.OPTION_IS_INVALID", + parameters = Map( + "key" -> "k", + "supported" -> "constant expressions")) + checkError( + exception = intercept[AnalysisException]( + sql(s"$prefix ('k' = raise_error('failure'))")), + errorClass = "INVALID_SQL_SYNTAX.OPTION_IS_INVALID", + parameters = Map( + "key" -> "k", + "supported" -> "constant expressions")) + checkError( + exception = intercept[AnalysisException]( + sql(s"$prefix ('k' = raise_error('failure'))")), + errorClass = "INVALID_SQL_SYNTAX.OPTION_IS_INVALID", + parameters = Map( + "key" -> "k", + "supported" -> "constant expressions")) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala index a51ac78fdb2..4fe7809162f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.connector import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, CreateTablePartitioningValidationSuite, ResolvedTable, TestRelation2, TestTable2, UnresolvedFieldName, UnresolvedFieldPosition, UnresolvedIdentifier} -import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, AlterTableCommand, CreateTableAsSelect, DropColumns, LogicalPlan, QualifiedColType, RenameColumn, ReplaceColumns, ReplaceTableAsSelect, TableSpec} +import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, AlterTableCommand, CreateTableAsSelect, DropColumns, LogicalPlan, QualifiedColType, RenameColumn, ReplaceColumns, ReplaceTableAsSelect, UnresolvedTableSpec} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition @@ -51,8 +51,7 @@ class V2CommandsCaseSensitivitySuite Seq(true, false).foreach { caseSensitive => withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { Seq("ID", "iD").foreach { ref => - val tableSpec = TableSpec(Map.empty, None, Map.empty, - None, None, None, false) + val tableSpec = UnresolvedTableSpec(Map.empty, None, None, None, None, false) val plan = CreateTableAsSelect( UnresolvedIdentifier(Array("table_name")), Expressions.identity(ref) :: Nil, @@ -75,8 +74,7 @@ class V2CommandsCaseSensitivitySuite Seq(true, false).foreach { caseSensitive => withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { Seq("POINT.X", "point.X", "poInt.x", "poInt.X").foreach { ref => - val tableSpec = TableSpec(Map.empty, None, Map.empty, - None, None, None, false) + val tableSpec = UnresolvedTableSpec(Map.empty, None, None, None, None, false) val plan = CreateTableAsSelect( UnresolvedIdentifier(Array("table_name")), Expressions.bucket(4, ref) :: Nil, @@ -100,8 +98,7 @@ class V2CommandsCaseSensitivitySuite Seq(true, false).foreach { caseSensitive => withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { Seq("ID", "iD").foreach { ref => - val tableSpec = TableSpec(Map.empty, None, Map.empty, - None, None, None, false) + val tableSpec = UnresolvedTableSpec(Map.empty, None, None, None, None, false) val plan = ReplaceTableAsSelect( UnresolvedIdentifier(Array("table_name")), Expressions.identity(ref) :: Nil, @@ -124,8 +121,7 @@ class V2CommandsCaseSensitivitySuite Seq(true, false).foreach { caseSensitive => withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { Seq("POINT.X", "point.X", "poInt.x", "poInt.X").foreach { ref => - val tableSpec = TableSpec(Map.empty, None, Map.empty, - None, None, None, false) + val tableSpec = UnresolvedTableSpec(Map.empty, None, None, None, None, false) val plan = ReplaceTableAsSelect( UnresolvedIdentifier(Array("table_name")), Expressions.bucket(4, ref) :: Nil, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org