This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 6e60b232c769 [SPARK-46968][SQL] Replace `UnsupportedOperationException` by `SparkUnsupportedOperationException` in `sql` 6e60b232c769 is described below commit 6e60b232c7693738b1d005858e5dac24e7bafcaf Author: Max Gekk <max.g...@gmail.com> AuthorDate: Sat Feb 3 00:22:06 2024 -0800 [SPARK-46968][SQL] Replace `UnsupportedOperationException` by `SparkUnsupportedOperationException` in `sql` ### What changes were proposed in this pull request? In the PR, I propose to replace all `UnsupportedOperationException` by `SparkUnsupportedOperationException` in `sql` code base, and introduce new legacy error classes with the `_LEGACY_ERROR_TEMP_` prefix. ### Why are the changes needed? To unify Spark SQL exception, and port Java exceptions on Spark exceptions with error classes. ### Does this PR introduce _any_ user-facing change? Yes, it can if user's code assumes some particular format of `UnsupportedOperationException` messages. ### How was this patch tested? By running the affected test suites: ``` $ build/sbt "core/testOnly *SparkThrowableSuite" ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44937 from MaxGekk/migrate-UnsupportedOperationException-api. Authored-by: Max Gekk <max.g...@gmail.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- common/utils/src/main/resources/error/error-classes.json | 10 ++++++++++ .../org/apache/spark/sql/catalyst/trees/QueryContexts.scala | 12 ++++++------ .../scala/org/apache/spark/sql/catalyst/util/UDTUtils.scala | 3 ++- .../org/apache/spark/sql/execution/UnsafeRowSerializer.scala | 2 +- .../sql/execution/streaming/CompactibleFileStreamLog.scala | 4 ++-- .../spark/sql/execution/streaming/ValueStateImpl.scala | 2 -- .../streaming/state/HDFSBackedStateStoreProvider.scala | 5 ++--- .../apache/spark/sql/execution/streaming/state/RocksDB.scala | 7 ++++--- 8 files changed, 27 insertions(+), 18 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 8399311cbfc4..ef9e81c98e05 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -7489,6 +7489,16 @@ "Datatype not supported <dt>" ] }, + "_LEGACY_ERROR_TEMP_3193" : { + "message" : [ + "Creating multiple column families with HDFSBackedStateStoreProvider is not supported" + ] + }, + "_LEGACY_ERROR_TEMP_3197" : { + "message" : [ + "Failed to create column family with reserved name=<colFamilyName>" + ] + }, "_LEGACY_ERROR_USER_RAISED_EXCEPTION" : { "message" : [ "<errorMessage>" diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/trees/QueryContexts.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/trees/QueryContexts.scala index 57271e535afb..c716002ef35c 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/trees/QueryContexts.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/trees/QueryContexts.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.trees -import org.apache.spark.{QueryContext, QueryContextType} +import org.apache.spark.{QueryContext, QueryContextType, SparkUnsupportedOperationException} /** The class represents error context of a SQL query. */ case class SQLQueryContext( @@ -131,16 +131,16 @@ case class SQLQueryContext( originStartIndex.get <= originStopIndex.get } - override def callSite: String = throw new UnsupportedOperationException + override def callSite: String = throw SparkUnsupportedOperationException() } case class DataFrameQueryContext(stackTrace: Seq[StackTraceElement]) extends QueryContext { override val contextType = QueryContextType.DataFrame - override def objectType: String = throw new UnsupportedOperationException - override def objectName: String = throw new UnsupportedOperationException - override def startIndex: Int = throw new UnsupportedOperationException - override def stopIndex: Int = throw new UnsupportedOperationException + override def objectType: String = throw SparkUnsupportedOperationException() + override def objectName: String = throw SparkUnsupportedOperationException() + override def startIndex: Int = throw SparkUnsupportedOperationException() + override def stopIndex: Int = throw SparkUnsupportedOperationException() override val fragment: String = { stackTrace.headOption.map { firstElem => diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/UDTUtils.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/UDTUtils.scala index 98768a35e8a5..a98aa26d02ef 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/UDTUtils.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/UDTUtils.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.util import scala.util.control.NonFatal +import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.sql.types.UserDefinedType import org.apache.spark.util.SparkClassUtils @@ -53,6 +54,6 @@ private[sql] object UDTUtils extends UDTUtils { private[sql] object DefaultUDTUtils extends UDTUtils { override def toRow(value: Any, udt: UserDefinedType[Any]): Any = { - throw new UnsupportedOperationException() + throw SparkUnsupportedOperationException() } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala index 8563bbcd7960..42fcfa8d60fa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala @@ -37,7 +37,7 @@ import org.apache.spark.unsafe.Platform * instance that is backed by an on-heap byte array. * * Note that this serializer implements only the [[Serializer]] methods that are used during - * shuffle, so certain [[SerializerInstance]] methods will throw UnsupportedOperationException. + * shuffle, so certain [[SerializerInstance]] methods will throw SparkUnsupportedOperationException. * * @param numFields the number of fields in the row being serialized. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index ef7cefe2394d..8d38bba1f2a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -178,8 +178,8 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( * CompactibleFileStreamLog maintains logs by itself, and manual purging might break internal * state, specifically which latest compaction batch is purged. * - * To simplify the situation, this method just throws UnsupportedOperationException regardless - * of given parameter, and let CompactibleFileStreamLog handles purging by itself. + * To simplify the situation, this method just throws SparkUnsupportedOperationException + * regardless of given parameter, and let CompactibleFileStreamLog handles purging by itself. */ override def purge(thresholdBatchId: Long): Unit = throw QueryExecutionErrors.cannotPurgeAsBreakInternalStateError() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala index d82ce5ba1125..11ae7f65b43d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala @@ -16,8 +16,6 @@ */ package org.apache.spark.sql.execution.streaming -import java.io.Serializable - import org.apache.commons.lang3.SerializationUtils import org.apache.spark.internal.Logging diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index ffb618d0fbb0..dd04053c5471 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -31,7 +31,7 @@ import org.apache.commons.io.IOUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ -import org.apache.spark.{SparkConf, SparkEnv} +import org.apache.spark.{SparkConf, SparkEnv, SparkUnsupportedOperationException} import org.apache.spark.internal.Logging import org.apache.spark.io.CompressionCodec import org.apache.spark.sql.catalyst.expressions.UnsafeRow @@ -115,8 +115,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with override def id: StateStoreId = HDFSBackedStateStoreProvider.this.stateStoreId override def createColFamilyIfAbsent(colFamilyName: String): Unit = { - throw new UnsupportedOperationException("Creating multiple column families with " + - "HDFSBackedStateStoreProvider is not supported") + throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3193") } override def get(key: UnsafeRow, colFamilyName: String): UnsafeRow = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index bf1a1c50d350..b3d981e4b25d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -34,7 +34,7 @@ import org.rocksdb.{RocksDB => NativeRocksDB, _} import org.rocksdb.CompressionType._ import org.rocksdb.TickerType._ -import org.apache.spark.TaskContext +import org.apache.spark.{SparkUnsupportedOperationException, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.errors.QueryExecutionErrors @@ -252,8 +252,9 @@ class RocksDB( */ def createColFamilyIfAbsent(colFamilyName: String): Unit = { if (colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME) { - throw new UnsupportedOperationException("Failed to create column family with reserved " + - s"name=$colFamilyName") + throw new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_3197", + messageParameters = Map("colFamilyName" -> colFamilyName).toMap) } if (!checkColFamilyExists(colFamilyName)) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org