This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 7e94f2a5433 [SPARK-43529][SQL][FOLLOWUP] Code cleanup in UnresolvedTableSpec and related plans 7e94f2a5433 is described below commit 7e94f2a543371b7d507e7bffab1ce7e44328dda5 Author: Gengliang Wang <gengli...@apache.org> AuthorDate: Tue Jun 13 10:34:19 2023 -0700 [SPARK-43529][SQL][FOLLOWUP] Code cleanup in UnresolvedTableSpec and related plans ### What changes were proposed in this pull request? Follow-up of https://github.com/apache/spark/pull/41191 to clean up the code in UnresolvedTableSpec and related plans: * Rename `OptionsListExpressions` as `OptionList` * Rename `trait TableSpec` as `TableSpecBase` * Rename `ResolvedTableSpec` as `TableSpec`, make sure all the physical plans are using `TableSpec` instead of `TableSpecBase`. * Move option list expressions to UnresolvedTableSpec, so that all the specs are in one class. * Make UnaryExpression an `UnaryExpression`, so that transforming with `mapExpressions` will transform it and the option list expressions in its child * Restore the signatures of class `CreateTable`, `CreateTableAsSelect`, `ReplaceTable` and `ReplaceTableAsSelect` ### Why are the changes needed? Make the code implementation simpler ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests Closes #41549 from gengliangwang/optionsFollowUp. Authored-by: Gengliang Wang <gengli...@apache.org> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../sql/catalyst/analysis/ResolveTableSpec.scala | 19 ++-- .../spark/sql/catalyst/parser/AstBuilder.scala | 28 +++--- .../sql/catalyst/plans/logical/v2Commands.scala | 57 ++++++----- .../apache/spark/sql/catalyst/trees/TreeNode.scala | 6 +- .../sql/connector/catalog/CatalogV2Util.scala | 4 +- .../CreateTablePartitioningValidationSuite.scala | 5 +- .../spark/sql/catalyst/parser/DDLParserSuite.scala | 106 +++++++++++---------- .../org/apache/spark/sql/DataFrameWriter.scala | 5 +- .../org/apache/spark/sql/DataFrameWriterV2.scala | 4 +- .../catalyst/analysis/ResolveSessionCatalog.scala | 10 +- .../datasources/v2/DataSourceV2Strategy.scala | 11 +-- .../apache/spark/sql/internal/CatalogImpl.scala | 8 +- .../spark/sql/streaming/DataStreamWriter.scala | 3 +- .../connector/V2CommandsCaseSensitivitySuite.scala | 14 ++- 14 files changed, 148 insertions(+), 132 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableSpec.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableSpec.scala index d4b0a8d25e0..69a5b13124a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableSpec.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableSpec.scala @@ -36,22 +36,23 @@ object ResolveTableSpec extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = { plan.resolveOperatorsWithPruning(_.containsAnyPattern(COMMAND), ruleId) { case t: CreateTable => - resolveTableSpec(t, t.tableSpec, t.optionsListExpressions, s => t.copy(tableSpec = s)) + resolveTableSpec(t, t.tableSpec, s => t.copy(tableSpec = s)) case t: CreateTableAsSelect => - resolveTableSpec(t, t.tableSpec, t.optionsListExpressions, s => t.copy(tableSpec = s)) + resolveTableSpec(t, t.tableSpec, s => t.copy(tableSpec = s)) case t: ReplaceTable => - resolveTableSpec(t, t.tableSpec, t.optionsListExpressions, s => t.copy(tableSpec = s)) + resolveTableSpec(t, t.tableSpec, s => t.copy(tableSpec = s)) case t: ReplaceTableAsSelect => - resolveTableSpec(t, t.tableSpec, t.optionsListExpressions, s => t.copy(tableSpec = s)) + resolveTableSpec(t, t.tableSpec, s => t.copy(tableSpec = s)) } } /** Helper method to resolve the table specification within a logical plan. */ private def resolveTableSpec( - input: LogicalPlan, tableSpec: TableSpec, optionsListExpressions: OptionsListExpressions, - withNewSpec: TableSpec => LogicalPlan): LogicalPlan = tableSpec match { - case u: UnresolvedTableSpec if optionsListExpressions.allOptionsResolved => - val newOptions: Seq[(String, String)] = optionsListExpressions.options.map { + input: LogicalPlan, + tableSpec: TableSpecBase, + withNewSpec: TableSpecBase => LogicalPlan): LogicalPlan = tableSpec match { + case u: UnresolvedTableSpec if u.optionExpression.resolved => + val newOptions: Seq[(String, String)] = u.optionExpression.options.map { case (key: String, null) => (key, null) case (key: String, value: Expression) => @@ -75,7 +76,7 @@ object ResolveTableSpec extends Rule[LogicalPlan] { } (key, newValue) } - val newTableSpec = ResolvedTableSpec( + val newTableSpec = TableSpec( properties = u.properties, provider = u.provider, options = newOptions.toMap, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 2156ae4d51f..a076385573e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3348,13 +3348,13 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit * specified. */ override def visitExpressionPropertyList( - ctx: ExpressionPropertyListContext): OptionsListExpressions = { + ctx: ExpressionPropertyListContext): OptionList = { val options = ctx.expressionProperty.asScala.map { property => val key: String = visitPropertyKey(property.key) val value: Expression = Option(property.value).map(expression).getOrElse(null) key -> value }.toSeq - OptionsListExpressions(options) + OptionList(options) } override def visitStringLit(ctx: StringLitContext): Token = { @@ -3391,7 +3391,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit */ type TableClauses = ( Seq[Transform], Seq[StructField], Option[BucketSpec], Map[String, String], - OptionsListExpressions, Option[String], Option[String], Option[SerdeInfo]) + OptionList, Option[String], Option[String], Option[SerdeInfo]) /** * Validate a create table statement and return the [[TableIdentifier]]. @@ -3686,8 +3686,8 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit def cleanTableOptions( ctx: ParserRuleContext, - options: OptionsListExpressions, - location: Option[String]): (OptionsListExpressions, Option[String]) = { + options: OptionList, + location: Option[String]): (OptionList, Option[String]) = { var path = location val filtered = cleanTableProperties(ctx, options.options.toMap).filter { case (key, value) if key.equalsIgnoreCase("path") => @@ -3705,7 +3705,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit false case _ => true } - (OptionsListExpressions(filtered.toSeq), path) + (OptionList(filtered.toSeq), path) } /** @@ -3864,7 +3864,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty) val cleanedProperties = cleanTableProperties(ctx, properties) val options = Option(ctx.options).map(visitExpressionPropertyList) - .getOrElse(OptionsListExpressions(Seq.empty)) + .getOrElse(OptionList(Seq.empty)) val location = visitLocationSpecList(ctx.locationSpec()) val (cleanedOptions, newLocation) = cleanTableOptions(ctx, options, location) val comment = visitCommentSpecList(ctx.commentSpec()) @@ -3959,7 +3959,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit val partitioning = partitionExpressions(partTransforms, partCols, ctx) ++ bucketSpec.map(_.asTransform) - val tableSpec = UnresolvedTableSpec(properties, provider, location, comment, + val tableSpec = UnresolvedTableSpec(properties, provider, options, location, comment, serdeInfo, external) Option(ctx.query).map(plan) match { @@ -3976,15 +3976,14 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit case Some(query) => CreateTableAsSelect(withIdentClause(identifierContext, UnresolvedIdentifier(_)), - partitioning, query, tableSpec, Map.empty, ifNotExists, optionsListExpressions = options) + partitioning, query, tableSpec, Map.empty, ifNotExists) case _ => // Note: table schema includes both the table columns list and the partition columns // with data type. val schema = StructType(columns ++ partCols) CreateTable(withIdentClause(identifierContext, UnresolvedIdentifier(_)), - schema, partitioning, tableSpec, ignoreIfExists = ifNotExists, - optionsListExpressions = options) + schema, partitioning, tableSpec, ignoreIfExists = ifNotExists) } } @@ -4029,7 +4028,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit val partitioning = partitionExpressions(partTransforms, partCols, ctx) ++ bucketSpec.map(_.asTransform) - val tableSpec = UnresolvedTableSpec(properties, provider, location, comment, + val tableSpec = UnresolvedTableSpec(properties, provider, options, location, comment, serdeInfo, external = false) Option(ctx.query).map(plan) match { @@ -4047,8 +4046,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit case Some(query) => ReplaceTableAsSelect( withIdentClause(ctx.replaceTableHeader.identifierReference(), UnresolvedIdentifier(_)), - partitioning, query, tableSpec, writeOptions = Map.empty, orCreate = orCreate, - optionsListExpressions = options) + partitioning, query, tableSpec, writeOptions = Map.empty, orCreate = orCreate) case _ => // Note: table schema includes both the table columns list and the partition columns @@ -4056,7 +4054,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit val schema = StructType(columns ++ partCols) ReplaceTable( withIdentClause(ctx.replaceTableHeader.identifierReference(), UnresolvedIdentifier(_)), - schema, partitioning, tableSpec, orCreate = orCreate, optionsListExpressions = options) + schema, partitioning, tableSpec, orCreate = orCreate) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index a781bf56b9b..bd646b7f692 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, AssignmentUtils, EliminateSubqueryAliases, FieldName, NamedRelation, PartitionSpec, ResolvedIdentifier, UnresolvedException} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.FunctionResource -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, MetadataAttribute, NamedExpression, Unevaluable, V2ExpressionUtils} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, MetadataAttribute, NamedExpression, UnaryExpression, Unevaluable, V2ExpressionUtils} import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema import org.apache.spark.sql.catalyst.trees.BinaryLike import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, RowDeltaUtils, WriteDeltaProjections} @@ -31,6 +31,7 @@ import org.apache.spark.sql.connector.expressions.filter.Predicate import org.apache.spark.sql.connector.write.{DeltaWrite, RowLevelOperation, RowLevelOperationTable, SupportsDelta, Write} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.types.{BooleanType, DataType, IntegerType, MapType, MetadataBuilder, StringType, StructField, StructType} +import org.apache.spark.util.Utils // For v2 DML commands, it may end up with the v1 fallback code path and need to build a DataFrame // which is required by the DS v1 API. We need to keep the analyzed input query plan to build @@ -444,9 +445,8 @@ case class CreateTable( name: LogicalPlan, tableSchema: StructType, partitioning: Seq[Transform], - tableSpec: TableSpec, - ignoreIfExists: Boolean, - optionsListExpressions: OptionsListExpressions = OptionsListExpressions(Seq.empty)) + tableSpec: TableSpecBase, + ignoreIfExists: Boolean) extends UnaryCommand with V2CreateTablePlan { override def child: LogicalPlan = name @@ -466,11 +466,10 @@ case class CreateTableAsSelect( name: LogicalPlan, partitioning: Seq[Transform], query: LogicalPlan, - tableSpec: TableSpec, + tableSpec: TableSpecBase, writeOptions: Map[String, String], ignoreIfExists: Boolean, - isAnalyzed: Boolean = false, - optionsListExpressions: OptionsListExpressions = OptionsListExpressions(Seq.empty)) + isAnalyzed: Boolean = false) extends V2CreateTableAsSelectPlan { override def markAsAnalyzed(ac: AnalysisContext): LogicalPlan = copy(isAnalyzed = true) @@ -498,9 +497,8 @@ case class ReplaceTable( name: LogicalPlan, tableSchema: StructType, partitioning: Seq[Transform], - tableSpec: TableSpec, - orCreate: Boolean, - optionsListExpressions: OptionsListExpressions = OptionsListExpressions(Seq.empty)) + tableSpec: TableSpecBase, + orCreate: Boolean) extends UnaryCommand with V2CreateTablePlan { override def child: LogicalPlan = name @@ -523,11 +521,10 @@ case class ReplaceTableAsSelect( name: LogicalPlan, partitioning: Seq[Transform], query: LogicalPlan, - tableSpec: TableSpec, + tableSpec: TableSpecBase, writeOptions: Map[String, String], orCreate: Boolean, - isAnalyzed: Boolean = false, - optionsListExpressions: OptionsListExpressions = OptionsListExpressions(Seq.empty)) + isAnalyzed: Boolean = false) extends V2CreateTableAsSelectPlan { override def markAsAnalyzed(ac: AnalysisContext): LogicalPlan = copy(isAnalyzed = true) @@ -1388,25 +1385,34 @@ case class DropIndex( copy(table = newChild) } -trait TableSpec { +trait TableSpecBase { def properties: Map[String, String] def provider: Option[String] def location: Option[String] def comment: Option[String] def serde: Option[SerdeInfo] def external: Boolean - def withNewLocation(newLocation: Option[String]): TableSpec } case class UnresolvedTableSpec( properties: Map[String, String], provider: Option[String], + optionExpression: OptionList, location: Option[String], comment: Option[String], serde: Option[SerdeInfo], - external: Boolean) extends TableSpec { - override def withNewLocation(loc: Option[String]): TableSpec = { - UnresolvedTableSpec(properties, provider, loc, comment, serde, external) + external: Boolean) extends UnaryExpression with Unevaluable with TableSpecBase { + + override def dataType: DataType = + throw new UnsupportedOperationException("UnresolvedTableSpec doesn't have a data type") + + override def child: Expression = optionExpression + + override protected def withNewChildInternal(newChild: Expression): Expression = + this.copy(optionExpression = newChild.asInstanceOf[OptionList]) + + override def simpleString(maxFields: Int): String = { + this.copy(properties = Utils.redact(properties).toMap).toString } } @@ -1415,11 +1421,12 @@ case class UnresolvedTableSpec( * UnresolvedTableSpec lives. We use a separate object so that tree traversals in analyzer rules can * descend into the child expressions naturally without extra treatment. */ -case class OptionsListExpressions(options: Seq[(String, Expression)]) +case class OptionList(options: Seq[(String, Expression)]) extends Expression with Unevaluable { override def nullable: Boolean = true override def dataType: DataType = MapType(StringType, StringType) override def children: Seq[Expression] = options.map(_._2) + override lazy val resolved: Boolean = options.map(_._2).forall(_.resolved) override protected def withNewChildrenInternal( newChildren: IndexedSeq[Expression]): Expression = { @@ -1428,21 +1435,19 @@ case class OptionsListExpressions(options: Seq[(String, Expression)]) case ((key: String, _), newChild: Expression) => (key, newChild) } - OptionsListExpressions(newOptions) + OptionList(newOptions) } - - lazy val allOptionsResolved: Boolean = options.map(_._2).forall(_.resolved) } -case class ResolvedTableSpec( +case class TableSpec( properties: Map[String, String], provider: Option[String], options: Map[String, String], location: Option[String], comment: Option[String], serde: Option[SerdeInfo], - external: Boolean) extends TableSpec { - override def withNewLocation(newLocation: Option[String]): TableSpec = { - ResolvedTableSpec(properties, provider, options, newLocation, comment, serde, external) + external: Boolean) extends TableSpecBase { + def withNewLocation(newLocation: Option[String]): TableSpec = { + TableSpec(properties, provider, options, newLocation, comment, serde, external) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 17c89d7e6ae..75802de1a66 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.ScalaReflection._ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, FunctionResource} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.JoinType -import org.apache.spark.sql.catalyst.plans.logical.{ResolvedTableSpec, UnresolvedTableSpec} +import org.apache.spark.sql.catalyst.plans.logical.TableSpec import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} import org.apache.spark.sql.catalyst.rules.RuleId import org.apache.spark.sql.catalyst.rules.RuleIdCollection @@ -927,11 +927,9 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product with Tre redactMapString(map.asCaseSensitiveMap().asScala, maxFields) case map: Map[_, _] => redactMapString(map, maxFields) - case t: ResolvedTableSpec => + case t: TableSpec => t.copy(properties = Utils.redact(t.properties).toMap, options = Utils.redact(t.options).toMap) :: Nil - case t: UnresolvedTableSpec => - t.copy(properties = Utils.redact(t.properties).toMap) :: Nil case table: CatalogTable => stringArgsForCatalogTable(table) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index e92c1ee75a6..be569b1de9d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -25,7 +25,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec} import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.catalyst.plans.logical.{ResolvedTableSpec, SerdeInfo, TableSpec} +import org.apache.spark.sql.catalyst.plans.logical.{SerdeInfo, TableSpec} import org.apache.spark.sql.catalyst.util.GeneratedColumn import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ import org.apache.spark.sql.connector.catalog.TableChange._ @@ -376,7 +376,7 @@ private[sql] object CatalogV2Util { def convertTableProperties(t: TableSpec): Map[String, String] = { val props = convertTableProperties( - t.properties, t.asInstanceOf[ResolvedTableSpec].options, t.serde, t.location, t.comment, + t.properties, t.options, t.serde, t.location, t.comment, t.provider, t.external) withDefaultOwnership(props) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala index d1651e536dd..4158dc9e273 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala @@ -20,14 +20,15 @@ package org.apache.spark.sql.catalyst.analysis import java.util import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LeafNode, UnresolvedTableSpec} +import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LeafNode, OptionList, UnresolvedTableSpec} import org.apache.spark.sql.connector.catalog.{InMemoryTableCatalog, Table, TableCapability, TableCatalog} import org.apache.spark.sql.connector.expressions.Expressions import org.apache.spark.sql.types.{DoubleType, LongType, StringType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap class CreateTablePartitioningValidationSuite extends AnalysisTest { - val tableSpec = UnresolvedTableSpec(Map.empty, None, None, None, None, false) + val tableSpec = + UnresolvedTableSpec(Map.empty, None, OptionList(Seq.empty), None, None, None, false) test("CreateTableAsSelect: fail missing top-level column") { val plan = CreateTableAsSelect( UnresolvedIdentifier(Array("table_name")), diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 8cfdf411ae9..f07de11727e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -57,7 +57,7 @@ class DDLParserSuite extends AnalysisTest { Seq.empty[Transform], Map.empty[String, String], Some("parquet"), - OptionsListExpressions(Seq.empty), + OptionList(Seq.empty), None, None, None) @@ -83,7 +83,7 @@ class DDLParserSuite extends AnalysisTest { Seq.empty[Transform], Map.empty[String, String], Some("parquet"), - OptionsListExpressions(Seq.empty), + OptionList(Seq.empty), None, None, None), @@ -103,7 +103,7 @@ class DDLParserSuite extends AnalysisTest { Seq(IdentityTransform(FieldReference("a"))), Map.empty[String, String], Some("parquet"), - OptionsListExpressions(Seq.empty), + OptionList(Seq.empty), None, None, None) @@ -157,7 +157,7 @@ class DDLParserSuite extends AnalysisTest { LiteralValue(34, IntegerType)))), Map.empty[String, String], Some("parquet"), - OptionsListExpressions(Seq.empty), + OptionList(Seq.empty), None, None, None) @@ -179,7 +179,7 @@ class DDLParserSuite extends AnalysisTest { List(bucket(5, Array(FieldReference.column("a")), Array(FieldReference.column("b")))), Map.empty[String, String], Some("parquet"), - OptionsListExpressions(Seq.empty), + OptionList(Seq.empty), None, None, None) @@ -198,7 +198,7 @@ class DDLParserSuite extends AnalysisTest { Seq.empty[Transform], Map.empty[String, String], Some("parquet"), - OptionsListExpressions(Seq.empty), + OptionList(Seq.empty), None, Some("abc"), None) @@ -218,7 +218,7 @@ class DDLParserSuite extends AnalysisTest { Seq.empty[Transform], Map("test" -> "test"), Some("parquet"), - OptionsListExpressions(Seq.empty), + OptionList(Seq.empty), None, None, None) @@ -236,7 +236,7 @@ class DDLParserSuite extends AnalysisTest { Seq.empty[Transform], Map.empty[String, String], Some("parquet"), - OptionsListExpressions(Seq.empty), + OptionList(Seq.empty), Some("/tmp/file"), None, None) @@ -254,7 +254,7 @@ class DDLParserSuite extends AnalysisTest { Seq.empty[Transform], Map.empty[String, String], Some("parquet"), - OptionsListExpressions(Seq.empty), + OptionList(Seq.empty), None, None, None) @@ -272,7 +272,7 @@ class DDLParserSuite extends AnalysisTest { Seq(IdentityTransform(FieldReference("part"))), Map.empty[String, String], None, - OptionsListExpressions(Seq.empty), + OptionList(Seq.empty), None, None, None) @@ -290,7 +290,7 @@ class DDLParserSuite extends AnalysisTest { Seq(IdentityTransform(FieldReference("part"))), Map.empty[String, String], None, - OptionsListExpressions(Seq.empty), + OptionList(Seq.empty), None, None, None) @@ -308,7 +308,7 @@ class DDLParserSuite extends AnalysisTest { Seq(IdentityTransform(FieldReference("part"))), Map.empty[String, String], Some("parquet"), - OptionsListExpressions(Seq.empty), + OptionList(Seq.empty), None, None, None) @@ -381,7 +381,7 @@ class DDLParserSuite extends AnalysisTest { Seq(IdentityTransform(FieldReference("part"))), Map.empty[String, String], None, - OptionsListExpressions(Seq.empty), + OptionList(Seq.empty), None, None, Some(SerdeInfo(storedAs = Some("parquet")))) @@ -406,7 +406,7 @@ class DDLParserSuite extends AnalysisTest { Seq(IdentityTransform(FieldReference("part"))), Map.empty[String, String], None, - OptionsListExpressions(Seq.empty), + OptionList(Seq.empty), None, None, Some(SerdeInfo(storedAs = Some(format), serde = Some("customSerde"), serdeProperties = Map( @@ -463,7 +463,7 @@ class DDLParserSuite extends AnalysisTest { Seq(IdentityTransform(FieldReference("part"))), Map.empty[String, String], None, - OptionsListExpressions(Seq.empty), + OptionList(Seq.empty), None, None, Some(SerdeInfo(storedAs = Some("textfile"), serdeProperties = Map( @@ -514,7 +514,7 @@ class DDLParserSuite extends AnalysisTest { Seq(IdentityTransform(FieldReference("part"))), Map.empty[String, String], None, - OptionsListExpressions(Seq.empty), + OptionList(Seq.empty), None, None, Some(SerdeInfo(formatClasses = Some(FormatClasses("inFormat", "outFormat"))))) @@ -537,7 +537,7 @@ class DDLParserSuite extends AnalysisTest { Seq(IdentityTransform(FieldReference("part"))), Map.empty[String, String], None, - OptionsListExpressions(Seq.empty), + OptionList(Seq.empty), None, None, Some(SerdeInfo( @@ -880,7 +880,7 @@ class DDLParserSuite extends AnalysisTest { Seq.empty[Transform], Map.empty, Some("json"), - OptionsListExpressions( + OptionList( Seq( ("a", Literal(1)), ("b", Literal(Decimal(0.1))), @@ -939,7 +939,7 @@ class DDLParserSuite extends AnalysisTest { Seq.empty[Transform], Map("p1" -> "v1", "p2" -> "v2"), Some("parquet"), - OptionsListExpressions(Seq.empty), + OptionList(Seq.empty), Some("/user/external/page_view"), Some("This is the staging page view table"), None) @@ -2476,7 +2476,7 @@ class DDLParserSuite extends AnalysisTest { partitioning: Seq[Transform], properties: Map[String, String], provider: Option[String], - options: OptionsListExpressions, + options: OptionList, location: Option[String], comment: Option[String], serdeInfo: Option[SerdeInfo], @@ -2486,51 +2486,55 @@ class DDLParserSuite extends AnalysisTest { def apply(plan: LogicalPlan): TableSpec = { plan match { case create: CreateTable => + val tableSpec = create.tableSpec.asInstanceOf[UnresolvedTableSpec] TableSpec( create.name.asInstanceOf[UnresolvedIdentifier].nameParts, Some(create.tableSchema), create.partitioning, - create.tableSpec.properties, - create.tableSpec.provider, - create.optionsListExpressions, - create.tableSpec.location, - create.tableSpec.comment, - create.tableSpec.serde, - create.tableSpec.external) + tableSpec.properties, + tableSpec.provider, + tableSpec.optionExpression, + tableSpec.location, + tableSpec.comment, + tableSpec.serde, + tableSpec.external) case replace: ReplaceTable => + val tableSpec = replace.tableSpec.asInstanceOf[UnresolvedTableSpec] TableSpec( replace.name.asInstanceOf[UnresolvedIdentifier].nameParts, Some(replace.tableSchema), replace.partitioning, - replace.tableSpec.properties, - replace.tableSpec.provider, - replace.optionsListExpressions, - replace.tableSpec.location, - replace.tableSpec.comment, - replace.tableSpec.serde) + tableSpec.properties, + tableSpec.provider, + tableSpec.optionExpression, + tableSpec.location, + tableSpec.comment, + tableSpec.serde) case ctas: CreateTableAsSelect => + val tableSpec = ctas.tableSpec.asInstanceOf[UnresolvedTableSpec] TableSpec( ctas.name.asInstanceOf[UnresolvedIdentifier].nameParts, Some(ctas.query).filter(_.resolved).map(_.schema), ctas.partitioning, - ctas.tableSpec.properties, - ctas.tableSpec.provider, - ctas.optionsListExpressions, - ctas.tableSpec.location, - ctas.tableSpec.comment, - ctas.tableSpec.serde, - ctas.tableSpec.external) + tableSpec.properties, + tableSpec.provider, + tableSpec.optionExpression, + tableSpec.location, + tableSpec.comment, + tableSpec.serde, + tableSpec.external) case rtas: ReplaceTableAsSelect => + val tableSpec = rtas.tableSpec.asInstanceOf[UnresolvedTableSpec] TableSpec( rtas.name.asInstanceOf[UnresolvedIdentifier].nameParts, Some(rtas.query).filter(_.resolved).map(_.schema), rtas.partitioning, - rtas.tableSpec.properties, - rtas.tableSpec.provider, - rtas.optionsListExpressions, - rtas.tableSpec.location, - rtas.tableSpec.comment, - rtas.tableSpec.serde) + tableSpec.properties, + tableSpec.provider, + tableSpec.optionExpression, + tableSpec.location, + tableSpec.comment, + tableSpec.serde) case other => fail(s"Expected to parse Create, CTAS, Replace, or RTAS plan" + s" from query, got ${other.getClass.getName}.") @@ -2564,7 +2568,7 @@ class DDLParserSuite extends AnalysisTest { Seq.empty[Transform], Map.empty[String, String], None, - OptionsListExpressions(Seq.empty), + OptionList(Seq.empty), None, None, None) @@ -2615,7 +2619,7 @@ class DDLParserSuite extends AnalysisTest { val createTableResult = CreateTable(UnresolvedIdentifier(Seq("my_tab")), schemaWithDefaultColumn, Seq.empty[Transform], UnresolvedTableSpec(Map.empty[String, String], Some("parquet"), - None, None, None, false), false) + OptionList(Seq.empty), None, None, None, false), false) // Parse the CREATE TABLE statement twice, swapping the order of the NOT NULL and DEFAULT // options, to make sure that the parser accepts any ordering of these options. comparePlans(parsePlan( @@ -2628,7 +2632,7 @@ class DDLParserSuite extends AnalysisTest { "b STRING NOT NULL DEFAULT \"abc\") USING parquet"), ReplaceTable(UnresolvedIdentifier(Seq("my_tab")), schemaWithDefaultColumn, Seq.empty[Transform], UnresolvedTableSpec(Map.empty[String, String], Some("parquet"), - None, None, None, false), false)) + OptionList(Seq.empty), None, None, None, false), false)) // These ALTER TABLE statements should parse successfully. comparePlans( parsePlan("ALTER TABLE t1 ADD COLUMN x int NOT NULL DEFAULT 42"), @@ -2784,12 +2788,12 @@ class DDLParserSuite extends AnalysisTest { "CREATE TABLE my_tab(a INT, b INT NOT NULL GENERATED ALWAYS AS (a+1)) USING parquet"), CreateTable(UnresolvedIdentifier(Seq("my_tab")), schemaWithGeneratedColumn, Seq.empty[Transform], UnresolvedTableSpec(Map.empty[String, String], Some("parquet"), - None, None, None, false), false)) + OptionList(Seq.empty), None, None, None, false), false)) comparePlans(parsePlan( "REPLACE TABLE my_tab(a INT, b INT NOT NULL GENERATED ALWAYS AS (a+1)) USING parquet"), ReplaceTable(UnresolvedIdentifier(Seq("my_tab")), schemaWithGeneratedColumn, Seq.empty[Transform], UnresolvedTableSpec(Map.empty[String, String], Some("parquet"), - None, None, None, false), false)) + OptionList(Seq.empty), None, None, None, false), false)) // Two generation expressions checkError( exception = parseException("CREATE TABLE my_tab(a INT, " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index a3cb12307fb..34b7c21db1a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedIdentifier, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, InsertIntoStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, UnresolvedTableSpec} +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, InsertIntoStatement, LogicalPlan, OptionList, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, UnresolvedTableSpec} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.connector.catalog.{CatalogPlugin, CatalogV2Implicits, CatalogV2Util, Identifier, SupportsCatalogOptions, Table, TableCatalog, TableProvider, V1Table} import org.apache.spark.sql.connector.catalog.TableCapability._ @@ -329,6 +329,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val tableSpec = UnresolvedTableSpec( properties = Map.empty, provider = Some(source), + optionExpression = OptionList(Seq.empty), location = extraOptions.get("path"), comment = extraOptions.get(TableCatalog.PROP_COMMENT), serde = None, @@ -593,6 +594,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val tableSpec = UnresolvedTableSpec( properties = Map.empty, provider = Some(source), + optionExpression = OptionList(Seq.empty), location = extraOptions.get("path"), comment = extraOptions.get(TableCatalog.PROP_COMMENT), serde = None, @@ -612,6 +614,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val tableSpec = UnresolvedTableSpec( properties = Map.empty, provider = Some(source), + optionExpression = OptionList(Seq.empty), location = extraOptions.get("path"), comment = extraOptions.get(TableCatalog.PROP_COMMENT), serde = None, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala index 101dd7ec299..6202fede568 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala @@ -23,7 +23,7 @@ import scala.collection.mutable import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException, UnresolvedIdentifier, UnresolvedRelation} import org.apache.spark.sql.catalyst.expressions.{Attribute, Bucket, Days, Hours, Literal, Months, Years} -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, UnresolvedTableSpec} +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, OptionList, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, UnresolvedTableSpec} import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference, Transform} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.types.IntegerType @@ -110,6 +110,7 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T]) val tableSpec = UnresolvedTableSpec( properties = properties.toMap, provider = provider, + optionExpression = OptionList(Seq.empty), location = None, comment = None, serde = None, @@ -198,6 +199,7 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T]) val tableSpec = UnresolvedTableSpec( properties = properties.toMap, provider = provider, + optionExpression = OptionList(Seq.empty), location = None, comment = None, serde = None, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 515b0bb90bd..fb1e9bcc591 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -158,7 +158,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) // For CREATE TABLE [AS SELECT], we should use the v1 command if the catalog is resolved to the // session catalog and the table provider is not v2. - case c @ CreateTable(ResolvedV1Identifier(ident), _, _, tableSpec: ResolvedTableSpec, _, _) => + case c @ CreateTable(ResolvedV1Identifier(ident), _, _, tableSpec: TableSpec, _) => val (storageFormat, provider) = getStorageFormatAndProvider( c.tableSpec.provider, tableSpec.options, c.tableSpec.location, c.tableSpec.serde, ctas = false) @@ -170,7 +170,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) } case c @ CreateTableAsSelect( - ResolvedV1Identifier(ident), _, _, tableSpec: ResolvedTableSpec, writeOptions, _, _, _) => + ResolvedV1Identifier(ident), _, _, tableSpec: TableSpec, writeOptions, _, _) => val (storageFormat, provider) = getStorageFormatAndProvider( c.tableSpec.provider, tableSpec.options ++ writeOptions, @@ -193,7 +193,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) // For REPLACE TABLE [AS SELECT], we should fail if the catalog is resolved to the // session catalog and the table provider is not v2. - case c @ ReplaceTable(ResolvedV1Identifier(ident), _, _, _, _, _) => + case c @ ReplaceTable(ResolvedV1Identifier(ident), _, _, _, _) => val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName) if (!isV2Provider(provider)) { throw QueryCompilationErrors.unsupportedTableOperationError( @@ -202,7 +202,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) c } - case c @ ReplaceTableAsSelect(ResolvedV1Identifier(ident), _, _, _, _, _, _, _) => + case c @ ReplaceTableAsSelect(ResolvedV1Identifier(ident), _, _, _, _, _, _) => val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName) if (!isV2Provider(provider)) { throw QueryCompilationErrors.unsupportedTableOperationError( @@ -444,7 +444,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) private def constructV1TableCmd( query: Option[LogicalPlan], - tableSpec: TableSpec, + tableSpec: TableSpecBase, ident: TableIdentifier, tableSchema: StructType, partitioning: Seq[Transform], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 359a6f43f0b..f7b18e6a7a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -172,7 +172,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat WriteToDataSourceV2Exec(writer, invalidateCacheFunc, planLater(query), customMetrics) :: Nil case CreateTable(ResolvedIdentifier(catalog, ident), schema, partitioning, - tableSpec, ifNotExists, unresolvedOptionsList) => + tableSpec: TableSpec, ifNotExists) => ResolveDefaultColumns.validateCatalogForDefaultValue(schema, catalog.asTableCatalog, ident) val newSchema: StructType = ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults( @@ -183,8 +183,8 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat CreateTableExec(catalog.asTableCatalog, ident, structTypeToV2Columns(newSchema), partitioning, qualifyLocInTableSpec(tableSpec), ifNotExists) :: Nil - case CreateTableAsSelect(ResolvedIdentifier(catalog, ident), parts, query, tableSpec, - options, ifNotExists, true, unresolvedOptionsList) => + case CreateTableAsSelect(ResolvedIdentifier(catalog, ident), parts, query, tableSpec: TableSpec, + options, ifNotExists, true) => catalog match { case staging: StagingTableCatalog => AtomicCreateTableAsSelectExec(staging, ident, parts, query, @@ -198,8 +198,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat RefreshTableExec(r.catalog, r.identifier, recacheTable(r)) :: Nil case ReplaceTable( - ResolvedIdentifier(catalog, ident), schema, parts, tableSpec, orCreate, - unresolvedOptionsList) => + ResolvedIdentifier(catalog, ident), schema, parts, tableSpec: TableSpec, orCreate) => ResolveDefaultColumns.validateCatalogForDefaultValue(schema, catalog.asTableCatalog, ident) val newSchema: StructType = ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults( @@ -218,7 +217,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat } case ReplaceTableAsSelect(ResolvedIdentifier(catalog, ident), - parts, query, tableSpec, options, orCreate, true, unresolvedOptionsList) => + parts, query, tableSpec: TableSpec, options, orCreate, true) => catalog match { case staging: StagingTableCatalog => AtomicReplaceTableAsSelectExec( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 2aac82a990e..76c89bfa4a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.{Expression, Literal} -import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LocalRelation, LogicalPlan, OptionsListExpressions, RecoverPartitions, ShowFunctions, ShowNamespaces, ShowTables, UnresolvedTableSpec, View} +import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LocalRelation, LogicalPlan, OptionList, RecoverPartitions, ShowFunctions, ShowNamespaces, ShowTables, UnresolvedTableSpec, View} import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, FunctionCatalog, Identifier, SupportsNamespaces, Table => V2Table, TableCatalog, V1Table} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{CatalogHelper, MultipartIdentifierHelper, NamespaceHelper, TransformHelper} import org.apache.spark.sql.errors.QueryCompilationErrors @@ -662,12 +662,13 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { None } - val newOptions = OptionsListExpressions(options.map { case (key, value) => + val newOptions = OptionList(options.map { case (key, value) => (key, Literal(value).asInstanceOf[Expression]) }.toSeq) val tableSpec = UnresolvedTableSpec( properties = Map(), provider = Some(source), + optionExpression = newOptions, location = location, comment = { if (description.isEmpty) None else Some(description) }, serde = None, @@ -678,8 +679,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { tableSchema = schema, partitioning = Seq(), tableSpec = tableSpec, - ignoreIfExists = false, - optionsListExpressions = newOptions) + ignoreIfExists = false) sparkSession.sessionState.executePlan(plan).toRdd sparkSession.table(tableName) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index f913faa030d..12977987f08 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.UnresolvedIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, UnresolvedTableSpec} +import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, OptionList, UnresolvedTableSpec} import org.apache.spark.sql.catalyst.streaming.InternalOutputModes import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.connector.catalog.{Identifier, SupportsWrite, Table, TableCatalog, TableProvider, V1Table, V2TableWithV1Fallback} @@ -293,6 +293,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { val tableSpec = UnresolvedTableSpec( Map.empty[String, String], Some(source), + OptionList(Seq.empty), extraOptions.get("path"), None, None, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala index 4fe7809162f..51d15a666db 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.connector import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, CreateTablePartitioningValidationSuite, ResolvedTable, TestRelation2, TestTable2, UnresolvedFieldName, UnresolvedFieldPosition, UnresolvedIdentifier} -import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, AlterTableCommand, CreateTableAsSelect, DropColumns, LogicalPlan, QualifiedColType, RenameColumn, ReplaceColumns, ReplaceTableAsSelect, UnresolvedTableSpec} +import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, AlterTableCommand, CreateTableAsSelect, DropColumns, LogicalPlan, OptionList, QualifiedColType, RenameColumn, ReplaceColumns, ReplaceTableAsSelect, UnresolvedTableSpec} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition @@ -51,7 +51,8 @@ class V2CommandsCaseSensitivitySuite Seq(true, false).foreach { caseSensitive => withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { Seq("ID", "iD").foreach { ref => - val tableSpec = UnresolvedTableSpec(Map.empty, None, None, None, None, false) + val tableSpec = + UnresolvedTableSpec(Map.empty, None, OptionList(Seq.empty), None, None, None, false) val plan = CreateTableAsSelect( UnresolvedIdentifier(Array("table_name")), Expressions.identity(ref) :: Nil, @@ -74,7 +75,8 @@ class V2CommandsCaseSensitivitySuite Seq(true, false).foreach { caseSensitive => withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { Seq("POINT.X", "point.X", "poInt.x", "poInt.X").foreach { ref => - val tableSpec = UnresolvedTableSpec(Map.empty, None, None, None, None, false) + val tableSpec = + UnresolvedTableSpec(Map.empty, None, OptionList(Seq.empty), None, None, None, false) val plan = CreateTableAsSelect( UnresolvedIdentifier(Array("table_name")), Expressions.bucket(4, ref) :: Nil, @@ -98,7 +100,8 @@ class V2CommandsCaseSensitivitySuite Seq(true, false).foreach { caseSensitive => withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { Seq("ID", "iD").foreach { ref => - val tableSpec = UnresolvedTableSpec(Map.empty, None, None, None, None, false) + val tableSpec = + UnresolvedTableSpec(Map.empty, None, OptionList(Seq.empty), None, None, None, false) val plan = ReplaceTableAsSelect( UnresolvedIdentifier(Array("table_name")), Expressions.identity(ref) :: Nil, @@ -121,7 +124,8 @@ class V2CommandsCaseSensitivitySuite Seq(true, false).foreach { caseSensitive => withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { Seq("POINT.X", "point.X", "poInt.x", "poInt.X").foreach { ref => - val tableSpec = UnresolvedTableSpec(Map.empty, None, None, None, None, false) + val tableSpec = + UnresolvedTableSpec(Map.empty, None, OptionList(Seq.empty), None, None, None, false) val plan = ReplaceTableAsSelect( UnresolvedIdentifier(Array("table_name")), Expressions.bucket(4, ref) :: Nil, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org