This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e60b232c769 [SPARK-46968][SQL] Replace 
`UnsupportedOperationException` by `SparkUnsupportedOperationException` in `sql`
6e60b232c769 is described below

commit 6e60b232c7693738b1d005858e5dac24e7bafcaf
Author: Max Gekk <max.g...@gmail.com>
AuthorDate: Sat Feb 3 00:22:06 2024 -0800

    [SPARK-46968][SQL] Replace `UnsupportedOperationException` by 
`SparkUnsupportedOperationException` in `sql`
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to replace all `UnsupportedOperationException` by 
`SparkUnsupportedOperationException` in `sql` code base, and introduce new 
legacy error classes with the `_LEGACY_ERROR_TEMP_` prefix.
    
    ### Why are the changes needed?
    To unify Spark SQL exception, and port Java exceptions on Spark exceptions 
with error classes.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, it can if user's code assumes some particular format of 
`UnsupportedOperationException` messages.
    
    ### How was this patch tested?
    By running the affected test suites:
    ```
    $ build/sbt "core/testOnly *SparkThrowableSuite"
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #44937 from MaxGekk/migrate-UnsupportedOperationException-api.
    
    Authored-by: Max Gekk <max.g...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 common/utils/src/main/resources/error/error-classes.json     | 10 ++++++++++
 .../org/apache/spark/sql/catalyst/trees/QueryContexts.scala  | 12 ++++++------
 .../scala/org/apache/spark/sql/catalyst/util/UDTUtils.scala  |  3 ++-
 .../org/apache/spark/sql/execution/UnsafeRowSerializer.scala |  2 +-
 .../sql/execution/streaming/CompactibleFileStreamLog.scala   |  4 ++--
 .../spark/sql/execution/streaming/ValueStateImpl.scala       |  2 --
 .../streaming/state/HDFSBackedStateStoreProvider.scala       |  5 ++---
 .../apache/spark/sql/execution/streaming/state/RocksDB.scala |  7 ++++---
 8 files changed, 27 insertions(+), 18 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index 8399311cbfc4..ef9e81c98e05 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -7489,6 +7489,16 @@
       "Datatype not supported <dt>"
     ]
   },
+  "_LEGACY_ERROR_TEMP_3193" : {
+    "message" : [
+      "Creating multiple column families with HDFSBackedStateStoreProvider is 
not supported"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3197" : {
+    "message" : [
+      "Failed to create column family with reserved name=<colFamilyName>"
+    ]
+  },
   "_LEGACY_ERROR_USER_RAISED_EXCEPTION" : {
     "message" : [
       "<errorMessage>"
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/trees/QueryContexts.scala
 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/trees/QueryContexts.scala
index 57271e535afb..c716002ef35c 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/trees/QueryContexts.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/trees/QueryContexts.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.trees
 
-import org.apache.spark.{QueryContext, QueryContextType}
+import org.apache.spark.{QueryContext, QueryContextType, 
SparkUnsupportedOperationException}
 
 /** The class represents error context of a SQL query. */
 case class SQLQueryContext(
@@ -131,16 +131,16 @@ case class SQLQueryContext(
       originStartIndex.get <= originStopIndex.get
   }
 
-  override def callSite: String = throw new UnsupportedOperationException
+  override def callSite: String = throw SparkUnsupportedOperationException()
 }
 
 case class DataFrameQueryContext(stackTrace: Seq[StackTraceElement]) extends 
QueryContext {
   override val contextType = QueryContextType.DataFrame
 
-  override def objectType: String = throw new UnsupportedOperationException
-  override def objectName: String = throw new UnsupportedOperationException
-  override def startIndex: Int = throw new UnsupportedOperationException
-  override def stopIndex: Int = throw new UnsupportedOperationException
+  override def objectType: String = throw SparkUnsupportedOperationException()
+  override def objectName: String = throw SparkUnsupportedOperationException()
+  override def startIndex: Int = throw SparkUnsupportedOperationException()
+  override def stopIndex: Int = throw SparkUnsupportedOperationException()
 
   override val fragment: String = {
     stackTrace.headOption.map { firstElem =>
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/UDTUtils.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/UDTUtils.scala
index 98768a35e8a5..a98aa26d02ef 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/UDTUtils.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/UDTUtils.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.util
 
 import scala.util.control.NonFatal
 
+import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.sql.types.UserDefinedType
 import org.apache.spark.util.SparkClassUtils
 
@@ -53,6 +54,6 @@ private[sql] object UDTUtils extends UDTUtils {
 
 private[sql] object DefaultUDTUtils extends UDTUtils {
   override def toRow(value: Any, udt: UserDefinedType[Any]): Any = {
-    throw new UnsupportedOperationException()
+    throw SparkUnsupportedOperationException()
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
index 8563bbcd7960..42fcfa8d60fa 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
@@ -37,7 +37,7 @@ import org.apache.spark.unsafe.Platform
  * instance that is backed by an on-heap byte array.
  *
  * Note that this serializer implements only the [[Serializer]] methods that 
are used during
- * shuffle, so certain [[SerializerInstance]] methods will throw 
UnsupportedOperationException.
+ * shuffle, so certain [[SerializerInstance]] methods will throw 
SparkUnsupportedOperationException.
  *
  * @param numFields the number of fields in the row being serialized.
  */
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
index ef7cefe2394d..8d38bba1f2a6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
@@ -178,8 +178,8 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : 
ClassTag](
    * CompactibleFileStreamLog maintains logs by itself, and manual purging 
might break internal
    * state, specifically which latest compaction batch is purged.
    *
-   * To simplify the situation, this method just throws 
UnsupportedOperationException regardless
-   * of given parameter, and let CompactibleFileStreamLog handles purging by 
itself.
+   * To simplify the situation, this method just throws 
SparkUnsupportedOperationException
+   * regardless of given parameter, and let CompactibleFileStreamLog handles 
purging by itself.
    */
   override def purge(thresholdBatchId: Long): Unit =
     throw QueryExecutionErrors.cannotPurgeAsBreakInternalStateError()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala
index d82ce5ba1125..11ae7f65b43d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala
@@ -16,8 +16,6 @@
  */
 package org.apache.spark.sql.execution.streaming
 
-import java.io.Serializable
-
 import org.apache.commons.lang3.SerializationUtils
 
 import org.apache.spark.internal.Logging
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index ffb618d0fbb0..dd04053c5471 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -31,7 +31,7 @@ import org.apache.commons.io.IOUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
 
-import org.apache.spark.{SparkConf, SparkEnv}
+import org.apache.spark.{SparkConf, SparkEnv, 
SparkUnsupportedOperationException}
 import org.apache.spark.internal.Logging
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
@@ -115,8 +115,7 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
     override def id: StateStoreId = 
HDFSBackedStateStoreProvider.this.stateStoreId
 
     override def createColFamilyIfAbsent(colFamilyName: String): Unit = {
-      throw new UnsupportedOperationException("Creating multiple column 
families with " +
-        "HDFSBackedStateStoreProvider is not supported")
+      throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3193")
     }
 
     override def get(key: UnsafeRow, colFamilyName: String): UnsafeRow = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index bf1a1c50d350..b3d981e4b25d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -34,7 +34,7 @@ import org.rocksdb.{RocksDB => NativeRocksDB, _}
 import org.rocksdb.CompressionType._
 import org.rocksdb.TickerType._
 
-import org.apache.spark.TaskContext
+import org.apache.spark.{SparkUnsupportedOperationException, TaskContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.errors.QueryExecutionErrors
@@ -252,8 +252,9 @@ class RocksDB(
    */
   def createColFamilyIfAbsent(colFamilyName: String): Unit = {
     if (colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME) {
-      throw new UnsupportedOperationException("Failed to create column family 
with reserved " +
-        s"name=$colFamilyName")
+      throw new SparkUnsupportedOperationException(
+        errorClass = "_LEGACY_ERROR_TEMP_3197",
+        messageParameters = Map("colFamilyName" -> colFamilyName).toMap)
     }
 
     if (!checkColFamilyExists(colFamilyName)) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to