This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 2aa06fcf160 [SPARK-45271][SQL] Merge _LEGACY_ERROR_TEMP_1113 into TABLE_OPERATION & delete some unused method in QueryCompilationErrors 2aa06fcf160 is described below commit 2aa06fcf1607bbad9e09649e587493032e739e35 Author: panbingkun <pbk1...@gmail.com> AuthorDate: Tue Sep 26 19:35:27 2023 +0800 [SPARK-45271][SQL] Merge _LEGACY_ERROR_TEMP_1113 into TABLE_OPERATION & delete some unused method in QueryCompilationErrors ### What changes were proposed in this pull request? The pr aims to - merge _LEGACY_ERROR_TEMP_1113 into UNSUPPORTED_FEATURE.TABLE_OPERATION - delete some unused method in QueryCompilationErrors - refactoring some methods to reduce call hierarchy ### Why are the changes needed? The changes improve the error framework. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Pass GA - Manually test ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43044 from panbingkun/LEGACY_ERROR_TEMP_1113. Authored-by: panbingkun <pbk1...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../src/main/resources/error/error-classes.json | 5 -- .../spark/sql/catalyst/plans/logical/object.scala | 12 ++- .../spark/sql/errors/QueryCompilationErrors.scala | 88 +++++++++------------- .../main/scala/org/apache/spark/sql/Dataset.scala | 2 +- .../datasources/v2/TableCapabilityCheck.scala | 2 +- .../streaming/test/DataStreamTableAPISuite.scala | 13 +++- 6 files changed, 57 insertions(+), 65 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 9bcbcbc1962..5d827c67482 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -4097,11 +4097,6 @@ "DESCRIBE does not support partition for v2 tables." ] }, - "_LEGACY_ERROR_TEMP_1113" : { - "message" : [ - "Table <table> does not support <cmd>." - ] - }, "_LEGACY_ERROR_TEMP_1114" : { "message" : [ "The streaming sources in a query do not have a common supported execution mode.", diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala index d4851019db8..9bf8db0b4fa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala @@ -727,16 +727,20 @@ object JoinWith { if a.sameRef(b) => catalyst.expressions.EqualTo( plan.left.resolveQuoted(a.name, resolver).getOrElse( - throw QueryCompilationErrors.resolveException(a.name, plan.left.schema.fieldNames)), + throw QueryCompilationErrors.unresolvedColumnError( + a.name, plan.left.schema.fieldNames)), plan.right.resolveQuoted(b.name, resolver).getOrElse( - throw QueryCompilationErrors.resolveException(b.name, plan.right.schema.fieldNames))) + throw QueryCompilationErrors.unresolvedColumnError( + b.name, plan.right.schema.fieldNames))) case catalyst.expressions.EqualNullSafe(a: AttributeReference, b: AttributeReference) if a.sameRef(b) => catalyst.expressions.EqualNullSafe( plan.left.resolveQuoted(a.name, resolver).getOrElse( - throw QueryCompilationErrors.resolveException(a.name, plan.left.schema.fieldNames)), + throw QueryCompilationErrors.unresolvedColumnError( + a.name, plan.left.schema.fieldNames)), plan.right.resolveQuoted(b.name, resolver).getOrElse( - throw QueryCompilationErrors.resolveException(b.name, plan.right.schema.fieldNames))) + throw QueryCompilationErrors.unresolvedColumnError( + b.name, plan.right.schema.fieldNames))) } } plan.copy(condition = cond) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 3536626d239..9d2b1225825 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -818,10 +818,6 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat messageParameters = Map("hintName" -> hintName)) } - def attributeNameSyntaxError(name: String): Throwable = { - DataTypeErrors.attributeNameSyntaxError(name) - } - def starExpandDataTypeNotSupportedError(attributes: Seq[String]): Throwable = { new AnalysisException( errorClass = "_LEGACY_ERROR_TEMP_1050", @@ -868,6 +864,40 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat "operation" -> operation)) } + private def unsupportedTableOperationError( + tableName: String, + operation: String): Throwable = { + new AnalysisException( + errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION", + messageParameters = Map( + "tableName" -> toSQLId(tableName), + "operation" -> operation)) + } + + def unsupportedBatchReadError(table: Table): Throwable = { + unsupportedTableOperationError(table.name(), "batch scan") + } + + def unsupportedStreamingScanError(table: Table): Throwable = { + unsupportedTableOperationError(table.name(), "either micro-batch or continuous scan") + } + + def unsupportedAppendInBatchModeError(table: Table): Throwable = { + unsupportedTableOperationError(table.name(), "append in batch mode") + } + + def unsupportedDynamicOverwriteInBatchModeError(table: Table): Throwable = { + unsupportedTableOperationError(table.name(), "dynamic overwrite in batch mode") + } + + def unsupportedTruncateInBatchModeError(table: Table): Throwable = { + unsupportedTableOperationError(table.name(), "truncate in batch mode") + } + + def unsupportedOverwriteByFilterInBatchModeError(table: Table): Throwable = { + unsupportedTableOperationError(table.name(), "overwrite by filter in batch mode") + } + def catalogOperationNotSupported(catalog: CatalogPlugin, operation: String): Throwable = { new AnalysisException( errorClass = "UNSUPPORTED_FEATURE.CATALOG_OPERATION", @@ -1292,38 +1322,6 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat new CannotReplaceMissingTableException(tableIdentifier, cause) } - private def unsupportedTableOperationError(table: Table, cmd: String): Throwable = { - new AnalysisException( - errorClass = "_LEGACY_ERROR_TEMP_1113", - messageParameters = Map( - "table" -> table.name, - "cmd" -> cmd)) - } - - def unsupportedBatchReadError(table: Table): Throwable = { - unsupportedTableOperationError(table, "batch scan") - } - - def unsupportedMicroBatchOrContinuousScanError(table: Table): Throwable = { - unsupportedTableOperationError(table, "either micro-batch or continuous scan") - } - - def unsupportedAppendInBatchModeError(table: Table): Throwable = { - unsupportedTableOperationError(table, "append in batch mode") - } - - def unsupportedDynamicOverwriteInBatchModeError(table: Table): Throwable = { - unsupportedTableOperationError(table, "dynamic overwrite in batch mode") - } - - def unsupportedTruncateInBatchModeError(table: Table): Throwable = { - unsupportedTableOperationError(table, "truncate in batch mode") - } - - def unsupportedOverwriteByFilterInBatchModeError(table: Table): Throwable = { - unsupportedTableOperationError(table, "overwrite by filter in batch mode") - } - def streamingSourcesDoNotSupportCommonExecutionModeError( microBatchSources: Seq[String], continuousSources: Seq[String]): Throwable = { @@ -2395,10 +2393,6 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat messageParameters = Map("windowExpressions" -> windowExpressions.toString())) } - def charOrVarcharTypeAsStringUnsupportedError(): Throwable = { - DataTypeErrors.charOrVarcharTypeAsStringUnsupportedError() - } - def escapeCharacterInTheMiddleError(pattern: String, char: String): Throwable = { new AnalysisException( errorClass = "INVALID_FORMAT.ESC_IN_THE_MIDDLE", @@ -3164,24 +3158,16 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat messageParameters = Map("expr" -> expr.sql)) } - def unresolvedColumnWithSuggestionError( - objectName: String, suggestion: String): AnalysisException = { + def unresolvedColumnError(colName: String, fields: Array[String]): AnalysisException = { new AnalysisException( errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION", messageParameters = Map( - "objectName" -> toSQLId(objectName), - "proposal" -> suggestion + "objectName" -> toSQLId(colName), + "proposal" -> fields.map(toSQLId).mkString(", ") ) ) } - def resolveException(colName: String, fields: Array[String]): AnalysisException = { - QueryCompilationErrors.unresolvedColumnWithSuggestionError( - colName, - fields.map(toSQLId).mkString(", ") - ) - } - def cannotParseIntervalError(delayThreshold: String, e: Throwable): Throwable = { val threshold = if (delayThreshold == null) "" else delayThreshold new AnalysisException( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index f07496e6430..68ea6dcffb0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -248,7 +248,7 @@ class Dataset[T] private[sql]( private[sql] def resolve(colName: String): NamedExpression = { val resolver = sparkSession.sessionState.analyzer.resolver queryExecution.analyzed.resolveQuoted(colName, resolver) - .getOrElse(throw QueryCompilationErrors.resolveException(colName, schema.fieldNames)) + .getOrElse(throw QueryCompilationErrors.unresolvedColumnError(colName, schema.fieldNames)) } private[sql] def numericColumns: Seq[Expression] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala index 28431972253..a3afaa36ab9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala @@ -42,7 +42,7 @@ object TableCapabilityCheck extends (LogicalPlan => Unit) { throw QueryCompilationErrors.unsupportedBatchReadError(r.table) case r: StreamingRelationV2 if !r.table.supportsAny(MICRO_BATCH_READ, CONTINUOUS_READ) => - throw QueryCompilationErrors.unsupportedMicroBatchOrContinuousScanError(r.table) + throw QueryCompilationErrors.unsupportedStreamingScanError(r.table) // TODO: check STREAMING_WRITE capability. It's not doable now because we don't have a // a logical plan for streaming write. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala index d049f27c21e..7bf81fb9865 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala @@ -113,9 +113,16 @@ class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter { spark.sql(s"CREATE TABLE $tableIdentifier (id bigint, data string) USING foo") - intercept[AnalysisException] { - spark.readStream.table(tableIdentifier) - }.message.contains("does not support either micro-batch or continuous scan") + checkError( + exception = intercept[AnalysisException] { + spark.readStream.table(tableIdentifier) + }, + errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION", + parameters = Map( + "tableName" -> "`testcat`.`table_name`", + "operation" -> "either micro-batch or continuous scan" + ) + ) } test("read: read table with custom catalog") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org