This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new bd6b751a5b1 [SPARK-41290][SQL] Support GENERATED ALWAYS AS expressions for columns in create/replace table statements bd6b751a5b1 is described below commit bd6b751a5b1c0b9b81039ca554d00e5ef841205d Author: Allison Portis <allison.por...@databricks.com> AuthorDate: Mon Feb 27 16:40:38 2023 +0800 [SPARK-41290][SQL] Support GENERATED ALWAYS AS expressions for columns in create/replace table statements ### What changes were proposed in this pull request? Enables creating generated columns in CREATE/REPLACE TABLE statements by specifying a generation expression for a column with GENERATED ALWAYS AS expr. For example the following will be supported: ```sql CREATE TABLE default.example ( time TIMESTAMP, date DATE GENERATED ALWAYS AS (CAST(time AS DATE)) ) ``` To be more specific this PR 1. Adds parser support for `GENERATED ALWAYS AS expr` in create/replace table statements. Generation expressions are temporarily stored in the field's metadata, and then are parsed/verified in `DataSourceV2Strategy` and used to instantiate v2 [Column](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java). 4. Adds `TableCatalog::capabilities()` and `TableCatalogCapability.SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS` This will be used to determine whether to allow specifying generation expressions or whether to throw an exception. ### Why are the changes needed? `GENERATED ALWAYS AS` is SQL standard. These changes will allow defining generated columns in create/replace table statements in Spark SQL. ### Does this PR introduce _any_ user-facing change? Using `GENERATED ALWAYS AS expr` in CREATE/REPLACE table statements will no longer throw a parsing error. When used with a supporting table catalog the query should progress, when used with a nonsupporting catalog there will be an analysis exception. Previous behavior: ``` spark-sql> CREATE TABLE default.example ( > time TIMESTAMP, > date DATE GENERATED ALWAYS AS (CAST(time AS DATE)) > ) > ; Error in query: Syntax error at or near 'GENERATED'(line 3, pos 14) == SQL == CREATE TABLE default.example ( time TIMESTAMP, date DATE GENERATED ALWAYS AS (CAST(time AS DATE)) --------------^^^ ) ``` New behavior: ``` AnalysisException: [UNSUPPORTED_FEATURE.TABLE_OPERATION] The feature is not supported: Table `my_tab` does not support creating generated columns with GENERATED ALWAYS AS expressions. Please check the current catalog and namespace to make sure the qualified table name is expected, and also check the catalog implementation which is configured by "spark.sql.catalog". ``` ### How was this patch tested? Adds unit tests Closes #38823 from allisonport-db/parser-support. Authored-by: Allison Portis <allison.por...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- core/src/main/resources/error/error-classes.json | 10 + docs/sql-ref-ansi-compliance.md | 2 + .../spark/sql/catalyst/parser/SqlBaseLexer.g4 | 2 + .../spark/sql/catalyst/parser/SqlBaseParser.g4 | 9 + .../apache/spark/sql/connector/catalog/Column.java | 35 +++- .../spark/sql/connector/catalog/TableCatalog.java | 7 + .../connector/catalog/TableCatalogCapability.java | 48 +++++ .../spark/sql/catalyst/parser/AstBuilder.scala | 37 +++- .../spark/sql/catalyst/util/GeneratedColumn.scala | 202 +++++++++++++++++++ .../sql/connector/catalog/CatalogV2Util.scala | 59 ++++-- .../spark/sql/errors/QueryCompilationErrors.scala | 10 + .../spark/sql/internal/connector/ColumnImpl.scala | 1 + .../spark/sql/catalyst/parser/DDLParserSuite.scala | 47 ++++- .../connector/catalog/InMemoryTableCatalog.scala | 5 + .../execution/datasources/DataSourceStrategy.scala | 9 +- .../datasources/v2/DataSourceV2Strategy.scala | 8 +- .../spark/sql/connector/DataSourceV2SQLSuite.scala | 222 +++++++++++++++++++++ .../spark/sql/execution/command/DDLSuite.scala | 11 + 18 files changed, 694 insertions(+), 30 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 8910ca86de4..408c97acaa3 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -571,6 +571,11 @@ ], "sqlState" : "42809" }, + "GENERATED_COLUMN_WITH_DEFAULT_VALUE" : { + "message" : [ + "A column cannot have both a default value and a generation expression but column <colName> has default value: (<defaultValue>) and generation expression: (<genExpr>)." + ] + }, "GRAPHITE_SINK_INVALID_PROTOCOL" : { "message" : [ "Invalid Graphite protocol: <protocol>." @@ -1607,6 +1612,11 @@ }, "sqlState" : "0A000" }, + "UNSUPPORTED_EXPRESSION_GENERATED_COLUMN" : { + "message" : [ + "Cannot create generated column <fieldName> with generation expression <expressionStr> because <reason>." + ] + }, "UNSUPPORTED_EXPR_FOR_OPERATOR" : { "message" : [ "A query operator contains one or more unsupported expressions. Consider to rewrite it to avoid window functions, aggregate functions, and generator functions in the WHERE clause.", diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md index 1501e14c604..4124e958e39 100644 --- a/docs/sql-ref-ansi-compliance.md +++ b/docs/sql-ref-ansi-compliance.md @@ -353,6 +353,7 @@ Below is a list of all the keywords in Spark SQL. |AFTER|non-reserved|non-reserved|non-reserved| |ALL|reserved|non-reserved|reserved| |ALTER|non-reserved|non-reserved|reserved| +|ALWAYS|non-reserved|non-reserved|non-reserved| |ANALYZE|non-reserved|non-reserved|non-reserved| |AND|reserved|non-reserved|reserved| |ANTI|non-reserved|strict-non-reserved|non-reserved| @@ -451,6 +452,7 @@ Below is a list of all the keywords in Spark SQL. |FULL|reserved|strict-non-reserved|reserved| |FUNCTION|non-reserved|non-reserved|reserved| |FUNCTIONS|non-reserved|non-reserved|non-reserved| +|GENERATED|non-reserved|non-reserved|non-reserved| |GLOBAL|non-reserved|non-reserved|reserved| |GRANT|reserved|non-reserved|reserved| |GROUP|reserved|non-reserved|reserved| diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 index 38f52901aa2..6d0862290cf 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 @@ -91,6 +91,7 @@ ADD: 'ADD'; AFTER: 'AFTER'; ALL: 'ALL'; ALTER: 'ALTER'; +ALWAYS: 'ALWAYS'; ANALYZE: 'ANALYZE'; AND: 'AND'; ANTI: 'ANTI'; @@ -189,6 +190,7 @@ FROM: 'FROM'; FULL: 'FULL'; FUNCTION: 'FUNCTION'; FUNCTIONS: 'FUNCTIONS'; +GENERATED: 'GENERATED'; GLOBAL: 'GLOBAL'; GRANT: 'GRANT'; GROUP: 'GROUP'; diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index 7c073411188..aa5f538bbf6 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -1035,9 +1035,14 @@ createOrReplaceTableColType colDefinitionOption : NOT NULL | defaultExpression + | generationExpression | commentSpec ; +generationExpression + : GENERATED ALWAYS AS LEFT_PAREN expression RIGHT_PAREN + ; + complexColTypeList : complexColType (COMMA complexColType)* ; @@ -1183,6 +1188,7 @@ ansiNonReserved : ADD | AFTER | ALTER + | ALWAYS | ANALYZE | ANTI | ANY_VALUE @@ -1252,6 +1258,7 @@ ansiNonReserved | FORMATTED | FUNCTION | FUNCTIONS + | GENERATED | GLOBAL | GROUPING | HOUR @@ -1441,6 +1448,7 @@ nonReserved | AFTER | ALL | ALTER + | ALWAYS | ANALYZE | AND | ANY @@ -1535,6 +1543,7 @@ nonReserved | FROM | FUNCTION | FUNCTIONS + | GENERATED | GLOBAL | GRANT | GROUP diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java index d2c8f25e739..b191438dbc3 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java @@ -33,6 +33,8 @@ import org.apache.spark.sql.types.DataType; * {@link TableCatalog#createTable(Identifier, Column[], Transform[], Map)}, and report it in * {@link Table#columns()} by calling the static {@code create} functions of this interface to * create it. + * <p> + * A column cannot have both a default value and a generation expression. */ @Evolving public interface Column { @@ -42,7 +44,16 @@ public interface Column { } static Column create(String name, DataType dataType, boolean nullable) { - return create(name, dataType, nullable, null, null, null); + return create(name, dataType, nullable, null, null); + } + + static Column create( + String name, + DataType dataType, + boolean nullable, + String comment, + String metadataInJSON) { + return new ColumnImpl(name, dataType, nullable, comment, null, null, metadataInJSON); } static Column create( @@ -52,7 +63,18 @@ public interface Column { String comment, ColumnDefaultValue defaultValue, String metadataInJSON) { - return new ColumnImpl(name, dataType, nullable, comment, defaultValue, metadataInJSON); + return new ColumnImpl(name, dataType, nullable, comment, defaultValue, null, metadataInJSON); + } + + static Column create( + String name, + DataType dataType, + boolean nullable, + String comment, + String generationExpression, + String metadataInJSON) { + return new ColumnImpl(name, dataType, nullable, comment, null, + generationExpression, metadataInJSON); } /** @@ -82,6 +104,15 @@ public interface Column { @Nullable ColumnDefaultValue defaultValue(); + /** + * Returns the generation expression of this table column. Null means no generation expression. + * <p> + * The generation expression is stored as spark SQL dialect. It is up to the data source to verify + * expression compatibility and reject writes as necessary. + */ + @Nullable + String generationExpression(); + /** * Returns the column metadata in JSON format. */ diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java index 82622d65205..eb442ad38bd 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java @@ -25,7 +25,9 @@ import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; import org.apache.spark.sql.errors.QueryCompilationErrors; import org.apache.spark.sql.types.StructType; +import java.util.Collections; import java.util.Map; +import java.util.Set; /** * Catalog methods for working with Tables. @@ -78,6 +80,11 @@ public interface TableCatalog extends CatalogPlugin { */ String OPTION_PREFIX = "option."; + /** + * @return the set of capabilities for this TableCatalog + */ + default Set<TableCatalogCapability> capabilities() { return Collections.emptySet(); } + /** * List the tables in a namespace from the catalog. * <p> diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java new file mode 100644 index 00000000000..84a2a0f7648 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import org.apache.spark.annotation.Evolving; + +/** + * Capabilities that can be provided by a {@link TableCatalog} implementation. + * <p> + * TableCatalogs use {@link TableCatalog#capabilities()} to return a set of capabilities. Each + * capability signals to Spark that the catalog supports a feature identified by the capability. + * For example, returning {@link #SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS} allows Spark to + * accept {@code GENERATED ALWAYS AS} expressions in {@code CREATE TABLE} statements. + * + * @since 3.4.0 + */ +@Evolving +public enum TableCatalogCapability { + + /** + * Signals that the TableCatalog supports defining generated columns upon table creation in SQL. + * <p> + * Without this capability, any create/replace table statements with a generated column defined + * in the table schema will throw an exception during analysis. + * <p> + * A generated column is defined with syntax: {@code colName colType GENERATED ALWAYS AS (expr)} + * <p> + * Generation expression are included in the column definition for APIs like + * {@link TableCatalog#createTable}. + * See {@link Column#generationExpression()}. + */ + SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 8957794ad95..1c5eac4ce17 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.trees.CurrentOrigin -import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, DateTimeUtils, IntervalUtils, ResolveDefaultColumns} +import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, DateTimeUtils, GeneratedColumn, IntervalUtils, ResolveDefaultColumns} import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, convertSpecialTimestamp, convertSpecialTimestampNTZ, getZoneId, stringToDate, stringToTimestamp, stringToTimestampWithoutTimeZone} import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SupportsNamespaces, TableCatalog} import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition @@ -3002,6 +3002,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit // Check that no duplicates exist among any CREATE TABLE column options specified. var nullable = true var defaultExpression: Option[DefaultExpressionContext] = None + var generationExpression: Option[GenerationExpressionContext] = None var commentSpec: Option[CommentSpecContext] = None ctx.colDefinitionOption().asScala.foreach { option => if (option.NULL != null) { @@ -3018,6 +3019,13 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit } defaultExpression = Some(expr) } + Option(option.generationExpression()).foreach { expr => + if (generationExpression.isDefined) { + throw QueryParsingErrors.duplicateCreateTableColumnOption( + option, colName.getText, "GENERATED ALWAYS AS") + } + generationExpression = Some(expr) + } Option(option.commentSpec()).foreach { spec => if (commentSpec.isDefined) { throw QueryParsingErrors.duplicateCreateTableColumnOption( @@ -3042,6 +3050,11 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit throw QueryParsingErrors.defaultColumnNotEnabledError(ctx) } } + // Add the 'GENERATED ALWAYS AS expression' clause in the column definition, if any, to the + // column metadata. + generationExpression.map(visitGenerationExpression).foreach { field => + builder.putString(GeneratedColumn.GENERATION_EXPRESSION_METADATA_KEY, field) + } val name: String = colName.getText @@ -3100,11 +3113,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit string(visitStringLit(ctx.stringLit)) } - /** - * Create a default string. - */ - override def visitDefaultExpression(ctx: DefaultExpressionContext): String = withOrigin(ctx) { - val exprCtx = ctx.expression() + private def verifyAndGetExpression(exprCtx: ExpressionContext): String = { // Make sure it can be converted to Catalyst expressions. expression(exprCtx) // Extract the raw expression text so that we can save the user provided text. We don't @@ -3116,6 +3125,22 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit exprCtx.getStart.getInputStream.getText(new Interval(start, end)) } + /** + * Create a default string. + */ + override def visitDefaultExpression(ctx: DefaultExpressionContext): String = + withOrigin(ctx) { + verifyAndGetExpression(ctx.expression()) + } + + /** + * Create a generation expression string. + */ + override def visitGenerationExpression(ctx: GenerationExpressionContext): String = + withOrigin(ctx) { + verifyAndGetExpression(ctx.expression()) + } + /** * Create an optional comment string. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala new file mode 100644 index 00000000000..6ff5df98d3c --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import org.apache.spark.SparkException +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.analysis.Analyzer +import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, Expression} +import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project} +import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION +import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.BuiltInFunctionCatalog +import org.apache.spark.sql.connector.catalog.{CatalogManager, TableCatalog, TableCatalogCapability} +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{DataType, StructField, StructType} + +/** + * This object contains utility methods and values for Generated Columns + */ +object GeneratedColumn { + + /** + * The metadata key for saving a generation expression in a generated column's metadata. This is + * only used internally and connectors should access generation expressions from the V2 columns. + */ + val GENERATION_EXPRESSION_METADATA_KEY = "GENERATION_EXPRESSION" + + /** Parser for parsing generation expression SQL strings */ + private lazy val parser = new CatalystSqlParser() + + /** + * Whether the given `field` is a generated column + */ + def isGeneratedColumn(field: StructField): Boolean = { + field.metadata.contains(GENERATION_EXPRESSION_METADATA_KEY) + } + + /** + * Returns the generation expression stored in the column metadata if it exists + */ + def getGenerationExpression(field: StructField): Option[String] = { + if (isGeneratedColumn(field)) { + Some(field.metadata.getString(GENERATION_EXPRESSION_METADATA_KEY)) + } else { + None + } + } + + /** + * Whether the `schema` has one or more generated columns + */ + def hasGeneratedColumns(schema: StructType): Boolean = { + schema.exists(isGeneratedColumn) + } + + /** + * Parse and analyze `expressionStr` and perform verification. This means: + * - The expression cannot reference itself + * - The expression cannot reference other generated columns + * - No user-defined expressions + * - The expression must be deterministic + * - The expression data type can be safely up-cast to the destination column data type + * - No subquery expressions + * + * Throws an [[AnalysisException]] if the expression cannot be converted or is an invalid + * generation expression according to the above rules. + */ + private def analyzeAndVerifyExpression( + expressionStr: String, + fieldName: String, + dataType: DataType, + schema: StructType, + statementType: String): Unit = { + def unsupportedExpressionError(reason: String): AnalysisException = { + new AnalysisException( + errorClass = "UNSUPPORTED_EXPRESSION_GENERATED_COLUMN", + messageParameters = Map( + "fieldName" -> fieldName, + "expressionStr" -> expressionStr, + "reason" -> reason)) + } + + // Parse the expression string + val parsed: Expression = try { + parser.parseExpression(expressionStr) + } catch { + case ex: ParseException => + // Shouldn't be possible since we check that the expression is a valid catalyst expression + // during parsing + throw SparkException.internalError( + s"Failed to execute $statementType command because the column $fieldName has " + + s"generation expression $expressionStr which fails to parse as a valid expression:" + + s"\n${ex.getMessage}") + } + // Don't allow subquery expressions + if (parsed.containsPattern(PLAN_EXPRESSION)) { + throw unsupportedExpressionError("subquery expressions are not allowed for generated columns") + } + // Analyze the parsed result + val allowedBaseColumns = schema + .filterNot(_.name == fieldName) // Can't reference itself + .filterNot(isGeneratedColumn) // Can't reference other generated columns + val relation = new LocalRelation(StructType(allowedBaseColumns).toAttributes) + val plan = try { + val analyzer: Analyzer = GeneratedColumnAnalyzer + val analyzed = analyzer.execute(Project(Seq(Alias(parsed, fieldName)()), relation)) + analyzer.checkAnalysis(analyzed) + analyzed + } catch { + case ex: AnalysisException => + // Improve error message if possible + if (ex.getErrorClass == "UNRESOLVED_COLUMN.WITH_SUGGESTION") { + ex.messageParameters.get("objectName").foreach { unresolvedCol => + val resolver = SQLConf.get.resolver + // Whether `col` = `unresolvedCol` taking into account case-sensitivity + def isUnresolvedCol(col: String) = + resolver(unresolvedCol, QueryCompilationErrors.toSQLId(col)) + // Check whether the unresolved column is this column + if (isUnresolvedCol(fieldName)) { + throw unsupportedExpressionError("generation expression cannot reference itself") + } + // Check whether the unresolved column is another generated column in the schema + if (schema.exists(col => isGeneratedColumn(col) && isUnresolvedCol(col.name))) { + throw unsupportedExpressionError( + "generation expression cannot reference another generated column") + } + } + } + if (ex.getErrorClass == "UNRESOLVED_ROUTINE") { + // Cannot resolve function using built-in catalog + ex.messageParameters.get("routineName").foreach { fnName => + throw unsupportedExpressionError(s"failed to resolve $fnName to a built-in function") + } + } + throw ex + } + val analyzed = plan.collectFirst { + case Project(Seq(a: Alias), _: LocalRelation) => a.child + }.get + if (!analyzed.deterministic) { + throw unsupportedExpressionError("generation expression is not deterministic") + } + if (!Cast.canUpCast(analyzed.dataType, dataType)) { + throw unsupportedExpressionError( + s"generation expression data type ${analyzed.dataType.simpleString} " + + s"is incompatible with column data type ${dataType.simpleString}") + } + } + + /** + * For any generated columns in `schema`, parse, analyze and verify the generation expression. + */ + private def verifyGeneratedColumns(schema: StructType, statementType: String): Unit = { + schema.foreach { field => + getGenerationExpression(field).foreach { expressionStr => + analyzeAndVerifyExpression(expressionStr, field.name, field.dataType, schema, statementType) + } + } + } + + /** + * If `schema` contains any generated columns: + * 1) Check whether the table catalog supports generated columns. Otherwise throw an error. + * 2) Parse, analyze and verify the generation expressions for any generated columns. + */ + def validateGeneratedColumns( + schema: StructType, + catalog: TableCatalog, + ident: Seq[String], + statementType: String): Unit = { + if (hasGeneratedColumns(schema)) { + if (!catalog.capabilities().contains( + TableCatalogCapability.SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS)) { + throw QueryCompilationErrors.generatedColumnsUnsupported(ident) + } + GeneratedColumn.verifyGeneratedColumns(schema, statementType) + } + } +} + +/** + * Analyzer for processing generated column expressions using built-in functions only. + */ +object GeneratedColumnAnalyzer extends Analyzer( + new CatalogManager(BuiltInFunctionCatalog, BuiltInFunctionCatalog.v1Catalog)) { +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index 9b481356fa6..12a8db92363 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -22,9 +22,11 @@ import java.util.Collections import scala.collection.JavaConverters._ +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec} import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.plans.logical.{SerdeInfo, TableSpec} +import org.apache.spark.sql.catalyst.util.GeneratedColumn import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ import org.apache.spark.sql.connector.catalog.TableChange._ import org.apache.spark.sql.connector.catalog.functions.UnboundFunction @@ -471,43 +473,62 @@ private[sql] object CatalogV2Util { /** * Converts a StructType to DS v2 columns, which decodes the StructField metadata to v2 column - * comment and default value. This is mainly used to generate DS v2 columns from table schema in - * DDL commands, so that Spark can pass DS v2 columns to DS v2 createTable and related APIs. + * comment and default value or generation expression. This is mainly used to generate DS v2 + * columns from table schema in DDL commands, so that Spark can pass DS v2 columns to DS v2 + * createTable and related APIs. */ def structTypeToV2Columns(schema: StructType): Array[Column] = { schema.fields.map(structFieldToV2Column) } private def structFieldToV2Column(f: StructField): Column = { - def createV2Column(defaultValue: ColumnDefaultValue, metadata: Metadata): Column = { - val metadataJSON = if (metadata == Metadata.empty) { + def metadataAsJson(metadata: Metadata): String = { + if (metadata == Metadata.empty) { null } else { metadata.json } - Column.create( - f.name, f.dataType, f.nullable, f.getComment().orNull, defaultValue, metadataJSON) } - if (f.getCurrentDefaultValue().isDefined && f.getExistenceDefaultValue().isDefined) { + def metadataWithKeysRemoved(keys: Seq[String]): Metadata = { + keys.foldLeft(new MetadataBuilder().withMetadata(f.metadata)) { + (builder, key) => builder.remove(key) + }.build() + } + + val isDefaultColumn = f.getCurrentDefaultValue().isDefined && + f.getExistenceDefaultValue().isDefined + val isGeneratedColumn = GeneratedColumn.isGeneratedColumn(f) + if (isDefaultColumn && isGeneratedColumn) { + throw new AnalysisException( + errorClass = "GENERATED_COLUMN_WITH_DEFAULT_VALUE", + messageParameters = Map( + "colName" -> f.name, + "defaultValue" -> f.getCurrentDefaultValue().get, + "genExpr" -> GeneratedColumn.getGenerationExpression(f).get + ) + ) + } + + if (isDefaultColumn) { val e = analyze(f, EXISTS_DEFAULT_COLUMN_METADATA_KEY) assert(e.resolved && e.foldable, "The existence default value must be a simple SQL string that is resolved and foldable, " + "but got: " + f.getExistenceDefaultValue().get) val defaultValue = new ColumnDefaultValue( f.getCurrentDefaultValue().get, LiteralValue(e.eval(), f.dataType)) - val cleanedMetadata = new MetadataBuilder() - .withMetadata(f.metadata) - .remove("comment") - .remove(CURRENT_DEFAULT_COLUMN_METADATA_KEY) - .remove(EXISTS_DEFAULT_COLUMN_METADATA_KEY) - .build() - createV2Column(defaultValue, cleanedMetadata) + val cleanedMetadata = metadataWithKeysRemoved( + Seq("comment", CURRENT_DEFAULT_COLUMN_METADATA_KEY, EXISTS_DEFAULT_COLUMN_METADATA_KEY)) + Column.create(f.name, f.dataType, f.nullable, f.getComment().orNull, defaultValue, + metadataAsJson(cleanedMetadata)) + } else if (isGeneratedColumn) { + val cleanedMetadata = metadataWithKeysRemoved( + Seq("comment", GeneratedColumn.GENERATION_EXPRESSION_METADATA_KEY)) + Column.create(f.name, f.dataType, f.nullable, f.getComment().orNull, + GeneratedColumn.getGenerationExpression(f).get, metadataAsJson(cleanedMetadata)) } else { - val cleanedMetadata = new MetadataBuilder() - .withMetadata(f.metadata) - .remove("comment") - .build() - createV2Column(null, cleanedMetadata) + val cleanedMetadata = metadataWithKeysRemoved(Seq("comment")) + Column.create(f.name, f.dataType, f.nullable, f.getComment().orNull, + metadataAsJson(cleanedMetadata)) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 556b3a62da3..1c257966aaf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -3405,6 +3405,16 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { } } + def generatedColumnsUnsupported(nameParts: Seq[String]): AnalysisException = { + new AnalysisException( + errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION", + messageParameters = Map( + "tableName" -> toSQLId(nameParts), + "operation" -> "generated columns" + ) + ) + } + def ambiguousLateralColumnAliasError(name: String, numOfMatches: Int): Throwable = { new AnalysisException( errorClass = "AMBIGUOUS_LATERAL_COLUMN_ALIAS", diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ColumnImpl.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ColumnImpl.scala index 5ab3f83eeae..2a67ffc4bbe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ColumnImpl.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ColumnImpl.scala @@ -27,4 +27,5 @@ case class ColumnImpl( nullable: Boolean, comment: String, defaultValue: ColumnDefaultValue, + generationExpression: String, metadataInJSON: String) extends Column diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 0efbd75ad93..5196d19ffcd 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.SparkThrowable import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions.{EqualTo, Hex, Literal} import org.apache.spark.sql.catalyst.plans.logical.{TableSpec => LogicalTableSpec, _} -import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns +import org.apache.spark.sql.catalyst.util.{GeneratedColumn, ResolveDefaultColumns} import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition.{after, first} import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform} import org.apache.spark.sql.connector.expressions.LogicalExpressions.bucket @@ -2717,4 +2717,49 @@ class DDLParserSuite extends AnalysisTest { context = ExpectedContext( fragment = "b STRING COMMENT \"abc\" NOT NULL COMMENT \"abc\"", start = 27, stop = 71)) } + + test("SPARK-41290: implement parser support for GENERATED ALWAYS AS columns in tables") { + val schemaWithGeneratedColumn = new StructType() + .add("a", IntegerType, true) + .add("b", IntegerType, false, + new MetadataBuilder() + .putString(GeneratedColumn.GENERATION_EXPRESSION_METADATA_KEY, "a+1") + .build()) + comparePlans(parsePlan( + "CREATE TABLE my_tab(a INT, b INT NOT NULL GENERATED ALWAYS AS (a+1)) USING parquet"), + CreateTable(UnresolvedIdentifier(Seq("my_tab")), schemaWithGeneratedColumn, + Seq.empty[Transform], LogicalTableSpec(Map.empty[String, String], Some("parquet"), + Map.empty[String, String], None, None, None, false), false)) + comparePlans(parsePlan( + "REPLACE TABLE my_tab(a INT, b INT NOT NULL GENERATED ALWAYS AS (a+1)) USING parquet"), + ReplaceTable(UnresolvedIdentifier(Seq("my_tab")), schemaWithGeneratedColumn, + Seq.empty[Transform], LogicalTableSpec(Map.empty[String, String], Some("parquet"), + Map.empty[String, String], None, None, None, false), false)) + // Two generation expressions + checkError( + exception = parseException("CREATE TABLE my_tab(a INT, " + + "b INT GENERATED ALWAYS AS (a + 1) GENERATED ALWAYS AS (a + 2)) USING PARQUET"), + errorClass = "CREATE_TABLE_COLUMN_OPTION_DUPLICATE", + parameters = Map("columnName" -> "b", "optionName" -> "GENERATED ALWAYS AS"), + context = ExpectedContext( + fragment = "b INT GENERATED ALWAYS AS (a + 1) GENERATED ALWAYS AS (a + 2)", + start = 27, + stop = 87 + ) + ) + // Empty expression + checkError( + exception = parseException( + "CREATE TABLE my_tab(a INT, b INT GENERATED ALWAYS AS ()) USING PARQUET"), + errorClass = "PARSE_SYNTAX_ERROR", + parameters = Map("error" -> "')'", "hint" -> "") + ) + // No parenthesis + checkError( + exception = parseException( + "CREATE TABLE my_tab(a INT, b INT GENERATED ALWAYS AS a + 1) USING PARQUET"), + errorClass = "PARSE_SYNTAX_ERROR", + parameters = Map("error" -> "'a'", "hint" -> ": missing '('") + ) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala index 50bea2b8d2f..e82f203742b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala @@ -170,6 +170,11 @@ class BasicInMemoryTableCatalog extends TableCatalog { } class InMemoryTableCatalog extends BasicInMemoryTableCatalog with SupportsNamespaces { + + override def capabilities: java.util.Set[TableCatalogCapability] = { + Set(TableCatalogCapability.SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS).asJava + } + protected def allNamespaces: Seq[Seq[String]] = { (tables.keySet.asScala.map(_.namespace.toSeq) ++ namespaces.keySet.asScala).toSeq.distinct } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 8b985e82963..74539b54117 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 -import org.apache.spark.sql.catalyst.util.{ResolveDefaultColumns, V2ExpressionBuilder} +import org.apache.spark.sql.catalyst.util.{GeneratedColumn, ResolveDefaultColumns, V2ExpressionBuilder} import org.apache.spark.sql.connector.catalog.SupportsRead import org.apache.spark.sql.connector.catalog.TableCapability._ import org.apache.spark.sql.connector.expressions.{Expression => V2Expression, NullOrdering, SortDirection, SortOrder => V2SortOrder, SortValue} @@ -136,6 +136,13 @@ case class DataSourceAnalysis(analyzer: Analyzer) extends Rule[LogicalPlan] { val newSchema: StructType = ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults( tableDesc.schema, tableDesc.provider, "CREATE TABLE", false) + + if (GeneratedColumn.hasGeneratedColumns(newSchema)) { + throw QueryCompilationErrors.generatedColumnsUnsupported( + Seq(tableDesc.identifier.catalog.get, tableDesc.identifier.database.get, + tableDesc.identifier.table)) + } + val newTableDesc = tableDesc.copy(schema = newSchema) CreateDataSourceTableCommand(newTableDesc, ignoreIfExists = mode == SaveMode.Ignore) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index b45de06371c..71ffe65b42a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruning import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.util.{toPrettySQL, ResolveDefaultColumns, V2ExpressionBuilder} +import org.apache.spark.sql.catalyst.util.{toPrettySQL, GeneratedColumn, ResolveDefaultColumns, V2ExpressionBuilder} import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog, SupportsDeleteV2, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, Table, TableCapability, TableCatalog, TruncatableTable} import org.apache.spark.sql.connector.catalog.CatalogV2Util.structTypeToV2Columns import org.apache.spark.sql.connector.catalog.index.SupportsIndex @@ -178,6 +178,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat val newSchema: StructType = ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults( schema, tableSpec.provider, "CREATE TABLE", false) + GeneratedColumn.validateGeneratedColumns( + newSchema, catalog.asTableCatalog, ident.asMultipartIdentifier, "CREATE TABLE") + CreateTableExec(catalog.asTableCatalog, ident, structTypeToV2Columns(newSchema), partitioning, qualifyLocInTableSpec(tableSpec), ifNotExists) :: Nil @@ -201,6 +204,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat val newSchema: StructType = ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults( schema, tableSpec.provider, "CREATE TABLE", false) + GeneratedColumn.validateGeneratedColumns( + newSchema, catalog.asTableCatalog, ident.asMultipartIdentifier, "CREATE TABLE") + val v2Columns = structTypeToV2Columns(newSchema) catalog match { case staging: StagingTableCatalog => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index c4dabaec888..85984f1b2a8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -1422,6 +1422,228 @@ class DataSourceV2SQLSuiteV1Filter } } + test("SPARK-41290: Generated columns only allowed with TableCatalogs that " + + "SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS") { + val tblName = "my_tab" + val tableDefinition = + s"$tblName(eventDate DATE, eventYear INT GENERATED ALWAYS AS (year(eventDate)))" + for (statement <- Seq("CREATE TABLE", "REPLACE TABLE")) { + // InMemoryTableCatalog.capabilities() = {SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS} + withTable(s"testcat.$tblName") { + if (statement == "REPLACE TABLE") { + sql(s"CREATE TABLE testcat.$tblName(a INT) USING foo") + } + // Can create table with a generated column + sql(s"$statement testcat.$tableDefinition USING foo") + assert(catalog("testcat").asTableCatalog.tableExists(Identifier.of(Array(), tblName))) + } + // BasicInMemoryTableCatalog.capabilities() = {} + withSQLConf("spark.sql.catalog.dummy" -> classOf[BasicInMemoryTableCatalog].getName) { + checkError( + exception = intercept[AnalysisException] { + sql("USE dummy") + sql(s"$statement dummy.$tableDefinition USING foo") + }, + errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION", + parameters = Map( + "tableName" -> "`my_tab`", + "operation" -> "generated columns" + ) + ) + } + } + } + + test("SPARK-41290: Column cannot have both a generation expression and a default value") { + val tblName = "my_tab" + val tableDefinition = + s"$tblName(eventDate DATE, eventYear INT GENERATED ALWAYS AS (year(eventDate)) DEFAULT 0)" + withSQLConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS.key -> "foo") { + for (statement <- Seq("CREATE TABLE", "REPLACE TABLE")) { + withTable(s"testcat.$tblName") { + if (statement == "REPLACE TABLE") { + sql(s"CREATE TABLE testcat.$tblName(a INT) USING foo") + } + checkError( + exception = intercept[AnalysisException] { + sql(s"$statement testcat.$tableDefinition USING foo") + }, + errorClass = "GENERATED_COLUMN_WITH_DEFAULT_VALUE", + parameters = Map( + "colName" -> "eventYear", + "defaultValue" -> "0", + "genExpr" -> "year(eventDate)") + ) + } + } + } + } + + test("SPARK-41290: Generated column expression must be valid generation expression") { + val tblName = "my_tab" + def checkUnsupportedGenerationExpression( + expr: String, + expectedReason: String, + genColType: String = "INT", + customTableDef: Option[String] = None): Unit = { + val tableDef = + s"CREATE TABLE testcat.$tblName(a INT, b $genColType GENERATED ALWAYS AS ($expr)) USING foo" + withTable(s"testcat.$tblName") { + checkError( + exception = intercept[AnalysisException] { + sql(customTableDef.getOrElse(tableDef)) + }, + errorClass = "UNSUPPORTED_EXPRESSION_GENERATED_COLUMN", + parameters = Map( + "fieldName" -> "b", + "expressionStr" -> expr, + "reason" -> expectedReason) + ) + } + } + + // Expression cannot be resolved since it doesn't exist + checkUnsupportedGenerationExpression( + "not_a_function(a)", + "failed to resolve `not_a_function` to a built-in function" + ) + + // Expression cannot be resolved since it's not a built-in function + spark.udf.register("timesTwo", (x: Int) => x * 2) + checkUnsupportedGenerationExpression( + "timesTwo(a)", + "failed to resolve `timesTwo` to a built-in function" + ) + + // Generated column can't reference itself + checkUnsupportedGenerationExpression( + "b + 1", + "generation expression cannot reference itself" + ) + // Obeys case sensitivity when intercepting the error message + // Intercepts when case-insensitive + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + checkUnsupportedGenerationExpression( + "B + 1", + "generation expression cannot reference itself" + ) + } + // Doesn't intercept when case-sensitive + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + withTable(s"testcat.$tblName") { + checkError( + exception = intercept[AnalysisException] { + sql(s"CREATE TABLE testcat.$tblName(a INT, " + + "b INT GENERATED ALWAYS AS (B + 1)) USING foo") + }, + errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION", + parameters = Map("objectName" -> "`B`", "proposal" -> "`a`"), + context = ExpectedContext(fragment = "B", start = 0, stop = 0) + ) + } + } + // Respects case sensitivity when resolving + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + withTable(s"testcat.$tblName") { + sql(s"CREATE TABLE testcat.$tblName(" + + "a INT, b INT GENERATED ALWAYS AS (B + 1), B INT) USING foo") + assert(catalog("testcat").asTableCatalog.tableExists(Identifier.of(Array(), tblName))) + } + } + + // Generated column can't reference other generated columns + checkUnsupportedGenerationExpression( + "c + 1", + "generation expression cannot reference another generated column", + customTableDef = Some( + s"CREATE TABLE testcat.$tblName(a INT, " + + "b INT GENERATED ALWAYS AS (c + 1), c INT GENERATED ALWAYS AS (a + 1)) USING foo" + ) + ) + // Respects case-insensitivity + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + checkUnsupportedGenerationExpression( + "C + 1", + "generation expression cannot reference another generated column", + customTableDef = Some( + s"CREATE TABLE testcat.$tblName(a INT, " + + "b INT GENERATED ALWAYS AS (C + 1), c INT GENERATED ALWAYS AS (a + 1)) USING foo" + ) + ) + checkUnsupportedGenerationExpression( + "c + 1", + "generation expression cannot reference another generated column", + customTableDef = Some( + s"CREATE TABLE testcat.$tblName(a INT, " + + "b INT GENERATED ALWAYS AS (c + 1), C INT GENERATED ALWAYS AS (a + 1)) USING foo" + ) + ) + } + // Respects case sensitivity when resolving + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + withTable(s"testcat.$tblName") { + sql(s"CREATE TABLE testcat.$tblName(" + + "a INT, A INT GENERATED ALWAYS AS (a + 1), b INT GENERATED ALWAYS AS (a + 1)) USING foo") + assert(catalog("testcat").asTableCatalog.tableExists(Identifier.of(Array(), tblName))) + } + } + + // Generated column can't reference non-existent column + withTable(s"testcat.$tblName") { + checkError( + exception = intercept[AnalysisException] { + sql(s"CREATE TABLE testcat.$tblName(a INT, b INT GENERATED ALWAYS AS (c + 1)) USING foo") + }, + errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION", + parameters = Map("objectName" -> "`c`", "proposal" -> "`a`"), + context = ExpectedContext(fragment = "c", start = 0, stop = 0) + ) + } + + // Expression must be deterministic + checkUnsupportedGenerationExpression( + "rand()", + "generation expression is not deterministic" + ) + + // Data type is incompatible + checkUnsupportedGenerationExpression( + "a + 1", + "generation expression data type int is incompatible with column data type boolean", + "BOOLEAN" + ) + // But we allow valid up-casts + withTable(s"testcat.$tblName") { + sql(s"CREATE TABLE testcat.$tblName(a INT, b LONG GENERATED ALWAYS AS (a + 1)) USING foo") + assert(catalog("testcat").asTableCatalog.tableExists(Identifier.of(Array(), tblName))) + } + + // No subquery expressions + checkUnsupportedGenerationExpression( + "(SELECT 1)", + "subquery expressions are not allowed for generated columns" + ) + checkUnsupportedGenerationExpression( + "(SELECT (SELECT 2) + 1)", // nested + "subquery expressions are not allowed for generated columns" + ) + checkUnsupportedGenerationExpression( + "(SELECT 1) + a", // refers to another column + "subquery expressions are not allowed for generated columns" + ) + withTable("other") { + sql("create table other(x INT) using parquet") + checkUnsupportedGenerationExpression( + "(select min(x) from other)", // refers to another table + "subquery expressions are not allowed for generated columns" + ) + } + checkUnsupportedGenerationExpression( + "(select min(x) from faketable)", // refers to a non-existent table + "subquery expressions are not allowed for generated columns" + ) + } + test("ShowCurrentNamespace: basic tests") { def testShowCurrentNamespace(expectedCatalogName: String, expectedNamespace: String): Unit = { val schema = new StructType() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 3d88d4f7ab9..4f3f993d7de 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -2181,6 +2181,17 @@ abstract class DDLSuite extends QueryTest with DDLSuiteBase { assert(spark.sessionState.catalog.isRegisteredFunction(rand)) } } + + test("SPARK-41290: No generated columns with V1") { + checkError( + exception = intercept[AnalysisException] { + sql(s"create table t(a int, b int generated always as (a + 1)) using parquet") + }, + errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION", + parameters = Map("tableName" -> "`spark_catalog`.`default`.`t`", + "operation" -> "generated columns") + ) + } } object FakeLocalFsFileSystem { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org