This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2aa06fcf160 [SPARK-45271][SQL] Merge _LEGACY_ERROR_TEMP_1113 into 
TABLE_OPERATION & delete some unused method in QueryCompilationErrors
2aa06fcf160 is described below

commit 2aa06fcf1607bbad9e09649e587493032e739e35
Author: panbingkun <pbk1...@gmail.com>
AuthorDate: Tue Sep 26 19:35:27 2023 +0800

    [SPARK-45271][SQL] Merge _LEGACY_ERROR_TEMP_1113 into TABLE_OPERATION & 
delete some unused method in QueryCompilationErrors
    
    ### What changes were proposed in this pull request?
    The pr aims to
    - merge _LEGACY_ERROR_TEMP_1113 into UNSUPPORTED_FEATURE.TABLE_OPERATION
    - delete some unused method in QueryCompilationErrors
    - refactoring some methods to reduce call hierarchy
    
    ### Why are the changes needed?
    The changes improve the error framework.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    - Pass GA
    - Manually test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #43044 from panbingkun/LEGACY_ERROR_TEMP_1113.
    
    Authored-by: panbingkun <pbk1...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../src/main/resources/error/error-classes.json    |  5 --
 .../spark/sql/catalyst/plans/logical/object.scala  | 12 ++-
 .../spark/sql/errors/QueryCompilationErrors.scala  | 88 +++++++++-------------
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  2 +-
 .../datasources/v2/TableCapabilityCheck.scala      |  2 +-
 .../streaming/test/DataStreamTableAPISuite.scala   | 13 +++-
 6 files changed, 57 insertions(+), 65 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index 9bcbcbc1962..5d827c67482 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -4097,11 +4097,6 @@
       "DESCRIBE does not support partition for v2 tables."
     ]
   },
-  "_LEGACY_ERROR_TEMP_1113" : {
-    "message" : [
-      "Table <table> does not support <cmd>."
-    ]
-  },
   "_LEGACY_ERROR_TEMP_1114" : {
     "message" : [
       "The streaming sources in a query do not have a common supported 
execution mode.",
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
index d4851019db8..9bf8db0b4fa 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
@@ -727,16 +727,20 @@ object JoinWith {
           if a.sameRef(b) =>
           catalyst.expressions.EqualTo(
             plan.left.resolveQuoted(a.name, resolver).getOrElse(
-              throw QueryCompilationErrors.resolveException(a.name, 
plan.left.schema.fieldNames)),
+              throw QueryCompilationErrors.unresolvedColumnError(
+                a.name, plan.left.schema.fieldNames)),
             plan.right.resolveQuoted(b.name, resolver).getOrElse(
-              throw QueryCompilationErrors.resolveException(b.name, 
plan.right.schema.fieldNames)))
+              throw QueryCompilationErrors.unresolvedColumnError(
+                b.name, plan.right.schema.fieldNames)))
         case catalyst.expressions.EqualNullSafe(a: AttributeReference, b: 
AttributeReference)
           if a.sameRef(b) =>
           catalyst.expressions.EqualNullSafe(
             plan.left.resolveQuoted(a.name, resolver).getOrElse(
-              throw QueryCompilationErrors.resolveException(a.name, 
plan.left.schema.fieldNames)),
+              throw QueryCompilationErrors.unresolvedColumnError(
+                a.name, plan.left.schema.fieldNames)),
             plan.right.resolveQuoted(b.name, resolver).getOrElse(
-              throw QueryCompilationErrors.resolveException(b.name, 
plan.right.schema.fieldNames)))
+              throw QueryCompilationErrors.unresolvedColumnError(
+                b.name, plan.right.schema.fieldNames)))
       }
     }
     plan.copy(condition = cond)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 3536626d239..9d2b1225825 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -818,10 +818,6 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
       messageParameters = Map("hintName" -> hintName))
   }
 
-  def attributeNameSyntaxError(name: String): Throwable = {
-    DataTypeErrors.attributeNameSyntaxError(name)
-  }
-
   def starExpandDataTypeNotSupportedError(attributes: Seq[String]): Throwable 
= {
     new AnalysisException(
       errorClass = "_LEGACY_ERROR_TEMP_1050",
@@ -868,6 +864,40 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
         "operation" -> operation))
   }
 
+  private def unsupportedTableOperationError(
+      tableName: String,
+      operation: String): Throwable = {
+    new AnalysisException(
+      errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION",
+      messageParameters = Map(
+        "tableName" -> toSQLId(tableName),
+        "operation" -> operation))
+  }
+
+  def unsupportedBatchReadError(table: Table): Throwable = {
+    unsupportedTableOperationError(table.name(), "batch scan")
+  }
+
+  def unsupportedStreamingScanError(table: Table): Throwable = {
+    unsupportedTableOperationError(table.name(), "either micro-batch or 
continuous scan")
+  }
+
+  def unsupportedAppendInBatchModeError(table: Table): Throwable = {
+    unsupportedTableOperationError(table.name(), "append in batch mode")
+  }
+
+  def unsupportedDynamicOverwriteInBatchModeError(table: Table): Throwable = {
+    unsupportedTableOperationError(table.name(), "dynamic overwrite in batch 
mode")
+  }
+
+  def unsupportedTruncateInBatchModeError(table: Table): Throwable = {
+    unsupportedTableOperationError(table.name(), "truncate in batch mode")
+  }
+
+  def unsupportedOverwriteByFilterInBatchModeError(table: Table): Throwable = {
+    unsupportedTableOperationError(table.name(), "overwrite by filter in batch 
mode")
+  }
+
   def catalogOperationNotSupported(catalog: CatalogPlugin, operation: String): 
Throwable = {
     new AnalysisException(
       errorClass = "UNSUPPORTED_FEATURE.CATALOG_OPERATION",
@@ -1292,38 +1322,6 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
     new CannotReplaceMissingTableException(tableIdentifier, cause)
   }
 
-  private def unsupportedTableOperationError(table: Table, cmd: String): 
Throwable = {
-    new AnalysisException(
-      errorClass = "_LEGACY_ERROR_TEMP_1113",
-      messageParameters = Map(
-        "table" -> table.name,
-        "cmd" -> cmd))
-  }
-
-  def unsupportedBatchReadError(table: Table): Throwable = {
-    unsupportedTableOperationError(table, "batch scan")
-  }
-
-  def unsupportedMicroBatchOrContinuousScanError(table: Table): Throwable = {
-    unsupportedTableOperationError(table, "either micro-batch or continuous 
scan")
-  }
-
-  def unsupportedAppendInBatchModeError(table: Table): Throwable = {
-    unsupportedTableOperationError(table, "append in batch mode")
-  }
-
-  def unsupportedDynamicOverwriteInBatchModeError(table: Table): Throwable = {
-    unsupportedTableOperationError(table, "dynamic overwrite in batch mode")
-  }
-
-  def unsupportedTruncateInBatchModeError(table: Table): Throwable = {
-    unsupportedTableOperationError(table, "truncate in batch mode")
-  }
-
-  def unsupportedOverwriteByFilterInBatchModeError(table: Table): Throwable = {
-    unsupportedTableOperationError(table, "overwrite by filter in batch mode")
-  }
-
   def streamingSourcesDoNotSupportCommonExecutionModeError(
       microBatchSources: Seq[String],
       continuousSources: Seq[String]): Throwable = {
@@ -2395,10 +2393,6 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
       messageParameters = Map("windowExpressions" -> 
windowExpressions.toString()))
   }
 
-  def charOrVarcharTypeAsStringUnsupportedError(): Throwable = {
-    DataTypeErrors.charOrVarcharTypeAsStringUnsupportedError()
-  }
-
   def escapeCharacterInTheMiddleError(pattern: String, char: String): 
Throwable = {
     new AnalysisException(
       errorClass = "INVALID_FORMAT.ESC_IN_THE_MIDDLE",
@@ -3164,24 +3158,16 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
       messageParameters = Map("expr" -> expr.sql))
   }
 
-  def unresolvedColumnWithSuggestionError(
-      objectName: String, suggestion: String): AnalysisException = {
+  def unresolvedColumnError(colName: String, fields: Array[String]): 
AnalysisException = {
     new AnalysisException(
       errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
       messageParameters = Map(
-        "objectName" -> toSQLId(objectName),
-        "proposal" -> suggestion
+        "objectName" -> toSQLId(colName),
+        "proposal" -> fields.map(toSQLId).mkString(", ")
       )
     )
   }
 
-  def resolveException(colName: String, fields: Array[String]): 
AnalysisException = {
-    QueryCompilationErrors.unresolvedColumnWithSuggestionError(
-      colName,
-      fields.map(toSQLId).mkString(", ")
-    )
-  }
-
   def cannotParseIntervalError(delayThreshold: String, e: Throwable): 
Throwable = {
     val threshold = if (delayThreshold == null) "" else delayThreshold
     new AnalysisException(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index f07496e6430..68ea6dcffb0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -248,7 +248,7 @@ class Dataset[T] private[sql](
   private[sql] def resolve(colName: String): NamedExpression = {
     val resolver = sparkSession.sessionState.analyzer.resolver
     queryExecution.analyzed.resolveQuoted(colName, resolver)
-      .getOrElse(throw QueryCompilationErrors.resolveException(colName, 
schema.fieldNames))
+      .getOrElse(throw QueryCompilationErrors.unresolvedColumnError(colName, 
schema.fieldNames))
   }
 
   private[sql] def numericColumns: Seq[Expression] = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala
index 28431972253..a3afaa36ab9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala
@@ -42,7 +42,7 @@ object TableCapabilityCheck extends (LogicalPlan => Unit) {
         throw QueryCompilationErrors.unsupportedBatchReadError(r.table)
 
       case r: StreamingRelationV2 if !r.table.supportsAny(MICRO_BATCH_READ, 
CONTINUOUS_READ) =>
-        throw 
QueryCompilationErrors.unsupportedMicroBatchOrContinuousScanError(r.table)
+        throw QueryCompilationErrors.unsupportedStreamingScanError(r.table)
 
       // TODO: check STREAMING_WRITE capability. It's not doable now because 
we don't have a
       //       a logical plan for streaming write.
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
index d049f27c21e..7bf81fb9865 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
@@ -113,9 +113,16 @@ class DataStreamTableAPISuite extends StreamTest with 
BeforeAndAfter {
 
     spark.sql(s"CREATE TABLE $tableIdentifier (id bigint, data string) USING 
foo")
 
-    intercept[AnalysisException] {
-      spark.readStream.table(tableIdentifier)
-    }.message.contains("does not support either micro-batch or continuous 
scan")
+    checkError(
+      exception = intercept[AnalysisException] {
+        spark.readStream.table(tableIdentifier)
+      },
+      errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION",
+      parameters = Map(
+        "tableName" -> "`testcat`.`table_name`",
+        "operation" -> "either micro-batch or continuous scan"
+      )
+    )
   }
 
   test("read: read table with custom catalog") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to