This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new fa60a7e216e6 [SPARK-46795][SQL] Replace 
`UnsupportedOperationException` by `SparkUnsupportedOperationException` in 
`sql/core`
fa60a7e216e6 is described below

commit fa60a7e216e63b1edb199b1610b26197815c656b
Author: Max Gekk <max.g...@gmail.com>
AuthorDate: Tue Jan 23 20:42:16 2024 +0300

    [SPARK-46795][SQL] Replace `UnsupportedOperationException` by 
`SparkUnsupportedOperationException` in `sql/core`
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to replace all `UnsupportedOperationException` by 
`SparkUnsupportedOperationException` in `sql/core` code base, and introduce new 
legacy error classes with the `_LEGACY_ERROR_TEMP_` prefix.
    
    ### Why are the changes needed?
    To unify Spark SQL exception, and port Java exceptions on Spark exceptions 
with error classes.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, it can if user's code assumes some particular format of 
`UnsupportedOperationException` messages.
    
    ### How was this patch tested?
    By running the modified test suites:
    ```
    $ build/sbt "core/testOnly *SparkThrowableSuite"
    $ build/sbt "test:testOnly *FileBasedDataSourceSuite"
    $ build/sbt "test:testOnly *ColumnarRulesSuite"
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #44772 from MaxGekk/migrate-UnsupportedOperationException-sql.
    
    Authored-by: Max Gekk <max.g...@gmail.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 .../src/main/resources/error/error-classes.json    | 150 +++++++++++++++++++++
 .../sql/execution/columnar/ColumnDictionary.java   |   3 +-
 .../datasources/orc/OrcArrayColumnVector.java      |  25 ++--
 .../datasources/orc/OrcAtomicColumnVector.java     |   9 +-
 .../datasources/orc/OrcMapColumnVector.java        |  25 ++--
 .../datasources/orc/OrcStructColumnVector.java     |  25 ++--
 .../parquet/ParquetVectorUpdaterFactory.java       |   3 +-
 .../parquet/VectorizedColumnReader.java            |  16 ++-
 .../parquet/VectorizedParquetRecordReader.java     |   3 +-
 .../parquet/VectorizedPlainValuesReader.java       |   3 +-
 .../datasources/parquet/VectorizedReaderBase.java  |  51 +++----
 .../parquet/VectorizedRleValuesReader.java         |  41 +++---
 .../execution/vectorized/ColumnVectorUtils.java    |   4 +-
 .../execution/vectorized/MutableColumnarRow.java   |  10 +-
 .../spark/sql/artifact/ArtifactManager.scala       |   5 +-
 .../spark/sql/execution/SparkStrategies.scala      |   7 +-
 .../spark/sql/execution/UnsafeRowSerializer.scala  |  15 ++-
 .../sql/execution/WholeStageCodegenExec.scala      |  12 +-
 .../execution/adaptive/OptimizeSkewedJoin.scala    |   3 +-
 .../execution/aggregate/SortAggregateExec.scala    |   5 +-
 .../columnar/GenerateColumnAccessor.scala          |  11 +-
 .../sql/execution/datasources/jdbc/JdbcUtils.scala |  12 +-
 .../datasources/parquet/ParquetUtils.scala         |  17 ++-
 .../spark/sql/execution/joins/HashedRelation.scala |  28 ++--
 .../FlatMapGroupsInPandasWithStateExec.scala       |   6 +-
 .../streaming/AcceptsLatestSeenOffsetHandler.scala |   7 +-
 .../AvailableNowMicroBatchStreamWrapper.scala      |   5 +-
 .../streaming/AvailableNowSourceWrapper.scala      |   5 +-
 .../sql/execution/streaming/GroupStateImpl.scala   |   4 +-
 .../sources/RatePerMicroBatchProvider.scala        |   3 +-
 .../sources/RatePerMicroBatchStream.scala          |   4 +-
 .../streaming/state/HDFSBackedStateStoreMap.scala  |   3 +-
 .../sql/execution/window/WindowFunctionFrame.scala |   5 +-
 .../org/apache/spark/sql/jdbc/DB2Dialect.scala     |   8 +-
 .../org/apache/spark/sql/jdbc/H2Dialect.scala      |  15 ++-
 .../org/apache/spark/sql/jdbc/JdbcDialects.scala   |  47 ++++---
 .../org/apache/spark/sql/jdbc/MySQLDialect.scala   |   8 +-
 .../org/apache/spark/sql/jdbc/OracleDialect.scala  |   8 +-
 .../apache/spark/sql/util/MapperRowCounter.scala   |   8 +-
 .../scala/org/apache/spark/sql/DatasetSuite.scala  |   6 +-
 .../spark/sql/FileBasedDataSourceSuite.scala       |  10 +-
 .../sql/connector/DataSourceV2FunctionSuite.scala  |   3 +-
 .../spark/sql/connector/DataSourceV2Suite.scala    |   2 +-
 .../sql/connector/TableCapabilityCheckSuite.scala  |   5 +-
 .../spark/sql/connector/V1ReadFallbackSuite.scala  |   5 +-
 .../spark/sql/execution/ColumnarRulesSuite.scala   |   5 +-
 .../apache/spark/sql/execution/PlannerSuite.scala  |   3 +-
 .../spark/sql/execution/SparkPlanSuite.scala       |   4 +-
 .../execution/columnar/ColumnarDataTypeUtils.scala |   5 +-
 49 files changed, 451 insertions(+), 216 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index 8f4e04ba5456..6088300f8e64 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -7297,6 +7297,156 @@
       ""
     ]
   },
+  "_LEGACY_ERROR_TEMP_3161" : {
+    "message" : [
+      "Uploading artifact file to local file system destination path is not 
supported."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3162" : {
+    "message" : [
+      "Unsupported physical type <type>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3163" : {
+    "message" : [
+      "Unsupported number of children: <num>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3165" : {
+    "message" : [
+      "Cannot merge <classA> with <classB>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3166" : {
+    "message" : [
+      "latestOffset(Offset, ReadLimit) should be called instead of this method"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3167" : {
+    "message" : [
+      "continuous mode is not supported!"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3168" : {
+    "message" : [
+      "hasTimedOut is true however there's no timeout configured"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3169" : {
+    "message" : [
+      "AcceptsLatestSeenOffset is not supported with DSv1 streaming source: 
<unsupportedSources>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3170" : {
+    "message" : [
+      "SortAggregate code-gen does not support grouping keys"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3171" : {
+    "message" : [
+      "Number of nulls not set for Parquet file <filePath>. Set SQLConf 
<config> to false and execute again."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3172" : {
+    "message" : [
+      "No min/max found for Parquet file <filePath>. Set SQLConf <config> to 
false and execute again."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3173" : {
+    "message" : [
+      "Cannot specify 'USING index_type' in 'CREATE INDEX'"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3175" : {
+    "message" : [
+      "Index Type <v> is not supported. The supported Index Types are: 
<supportedIndexTypeList>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3176" : {
+    "message" : [
+      "applyInPandasWithState is unsupported in batch query. Use applyInPandas 
instead."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3177" : {
+    "message" : [
+      "<class> does not support function: <funcName>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3178" : {
+    "message" : [
+      "<class> does not support inverse distribution function: <funcName>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3179" : {
+    "message" : [
+      "createIndex is not supported"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3180" : {
+    "message" : [
+      "indexExists is not supported"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3181" : {
+    "message" : [
+      "dropIndex is not supported"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3182" : {
+    "message" : [
+      "listIndexes is not supported"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3183" : {
+    "message" : [
+      "TableSample is not supported by this data source"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3184" : {
+    "message" : [
+      "<class> does not support aggregate function: <funcName> with DISTINCT"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3185" : {
+    "message" : [
+      "Schema evolution not supported."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3186" : {
+    "message" : [
+      "Boolean is not supported"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3187" : {
+    "message" : [
+      "only readInts is valid."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3188" : {
+    "message" : [
+      "only skipIntegers is valid"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3189" : {
+    "message" : [
+      "Unsupported encoding: <encoding>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3190" : {
+    "message" : [
+      "RLE encoding is not supported for values of type: <typeName>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3191" : {
+    "message" : [
+      "Dictionary encoding does not support String"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3192" : {
+    "message" : [
+      "Datatype not supported <dt>"
+    ]
+  },
   "_LEGACY_ERROR_USER_RAISED_EXCEPTION" : {
     "message" : [
       "<errorMessage>"
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/columnar/ColumnDictionary.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/columnar/ColumnDictionary.java
index 29271fc5c0a2..523dde831342 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/columnar/ColumnDictionary.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/columnar/ColumnDictionary.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.columnar;
 
+import org.apache.spark.SparkUnsupportedOperationException;
 import org.apache.spark.sql.execution.vectorized.Dictionary;
 
 public final class ColumnDictionary implements Dictionary {
@@ -59,6 +60,6 @@ public final class ColumnDictionary implements Dictionary {
 
   @Override
   public byte[] decodeToBinary(int id) {
-    throw new UnsupportedOperationException("Dictionary encoding does not 
support String");
+    throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3191");
   }
 }
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcArrayColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcArrayColumnVector.java
index b0c818f5a4df..bfed046c9d39 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcArrayColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcArrayColumnVector.java
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.orc;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
 
+import org.apache.spark.SparkUnsupportedOperationException;
 import org.apache.spark.sql.types.ArrayType;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.Decimal;
@@ -52,61 +53,61 @@ public class OrcArrayColumnVector extends OrcColumnVector {
 
   @Override
   public boolean getBoolean(int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public byte getByte(int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public short getShort(int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public int getInt(int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public long getLong(int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public float getFloat(int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public double getDouble(int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public Decimal getDecimal(int rowId, int precision, int scale) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public UTF8String getUTF8String(int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public byte[] getBinary(int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public ColumnarMap getMap(int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public org.apache.spark.sql.vectorized.ColumnVector getChild(int ordinal) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 }
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java
index f120482f63fa..36e5da64bb75 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java
@@ -21,6 +21,7 @@ import java.math.BigDecimal;
 
 import org.apache.hadoop.hive.ql.exec.vector.*;
 
+import org.apache.spark.SparkUnsupportedOperationException;
 import org.apache.spark.sql.catalyst.util.DateTimeUtils;
 import org.apache.spark.sql.catalyst.util.RebaseDateTime;
 import org.apache.spark.sql.types.DataType;
@@ -71,7 +72,7 @@ public class OrcAtomicColumnVector extends OrcColumnVector {
     } else if (vector instanceof TimestampColumnVector timestampColumnVector) {
       timestampData = timestampColumnVector;
     } else {
-      throw new UnsupportedOperationException();
+      throw SparkUnsupportedOperationException.apply();
     }
   }
 
@@ -146,16 +147,16 @@ public class OrcAtomicColumnVector extends 
OrcColumnVector {
 
   @Override
   public ColumnarArray getArray(int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public ColumnarMap getMap(int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public org.apache.spark.sql.vectorized.ColumnVector getChild(int ordinal) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 }
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcMapColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcMapColumnVector.java
index 7eedd8b59412..a6d82360364f 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcMapColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcMapColumnVector.java
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.orc;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
 
+import org.apache.spark.SparkUnsupportedOperationException;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.sql.types.MapType;
@@ -55,61 +56,61 @@ public class OrcMapColumnVector extends OrcColumnVector {
 
   @Override
   public boolean getBoolean(int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public byte getByte(int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public short getShort(int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public int getInt(int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public long getLong(int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public float getFloat(int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public double getDouble(int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public Decimal getDecimal(int rowId, int precision, int scale) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public UTF8String getUTF8String(int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public byte[] getBinary(int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public ColumnarArray getArray(int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public org.apache.spark.sql.vectorized.ColumnVector getChild(int ordinal) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 }
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcStructColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcStructColumnVector.java
index 48e540d22095..d675beb6536e 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcStructColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcStructColumnVector.java
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.orc;
 
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 
+import org.apache.spark.SparkUnsupportedOperationException;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.sql.types.StructType;
@@ -45,61 +46,61 @@ public class OrcStructColumnVector extends OrcColumnVector {
 
   @Override
   public boolean getBoolean(int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public byte getByte(int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public short getShort(int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public int getInt(int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public long getLong(int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public float getFloat(int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public double getDouble(int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public Decimal getDecimal(int rowId, int precision, int scale) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public UTF8String getUTF8String(int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public byte[] getBinary(int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public ColumnarArray getArray(int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public ColumnarMap getMap(int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 }
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
index 3863818b0255..0d8713b58cec 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
@@ -27,6 +27,7 @@ import 
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotat
 import 
org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation;
 import org.apache.parquet.schema.PrimitiveType;
 
+import org.apache.spark.SparkUnsupportedOperationException;
 import org.apache.spark.sql.catalyst.util.DateTimeUtils;
 import org.apache.spark.sql.catalyst.util.RebaseDateTime;
 import org.apache.spark.sql.execution.datasources.DataSourceUtils;
@@ -279,7 +280,7 @@ public class ParquetVectorUpdaterFactory {
         WritableColumnVector values,
         WritableColumnVector dictionaryIds,
         Dictionary dictionary) {
-      throw new UnsupportedOperationException("Boolean is not supported");
+      throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3186");
     }
   }
 
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index 6479644968ed..d580023bc877 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet;
 
 import java.io.IOException;
 import java.time.ZoneId;
+import java.util.Map;
 
 import org.apache.parquet.CorruptDeltaByteArrays;
 import org.apache.parquet.VersionParser.ParsedVersion;
@@ -37,6 +38,7 @@ import 
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotat
 import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit;
 import org.apache.parquet.schema.PrimitiveType;
 
+import org.apache.spark.SparkUnsupportedOperationException;
 import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.Decimal;
@@ -336,7 +338,8 @@ public class VectorizedColumnReader {
       @SuppressWarnings("deprecation")
       Encoding plainDict = Encoding.PLAIN_DICTIONARY; // var to allow warning 
suppression
       if (dataEncoding != plainDict && dataEncoding != 
Encoding.RLE_DICTIONARY) {
-        throw new UnsupportedOperationException("Unsupported encoding: " + 
dataEncoding);
+        throw new SparkUnsupportedOperationException(
+          "_LEGACY_ERROR_TEMP_3189", Map.of("encoding", 
dataEncoding.toString()));
       }
       this.dataColumn = new VectorizedRleValuesReader();
       this.isCurrentPageDictionaryEncoded = true;
@@ -371,18 +374,21 @@ public class VectorizedColumnReader {
         if (typeName == BOOLEAN) {
           yield new VectorizedRleValuesReader(1);
         } else {
-          throw new UnsupportedOperationException(
-            "RLE encoding is not supported for values of type: " + typeName);
+          throw new SparkUnsupportedOperationException(
+            "_LEGACY_ERROR_TEMP_3190", Map.of("typeName", 
typeName.toString()));
         }
       }
-      default -> throw new UnsupportedOperationException("Unsupported 
encoding: " + encoding);
+      default ->
+        throw new SparkUnsupportedOperationException(
+          "_LEGACY_ERROR_TEMP_3189", Map.of("encoding", encoding.toString()));
     };
   }
 
 
   private int readPageV1(DataPageV1 page) throws IOException {
     if (page.getDlEncoding() != Encoding.RLE && 
descriptor.getMaxDefinitionLevel() != 0) {
-      throw new UnsupportedOperationException("Unsupported encoding: " + 
page.getDlEncoding());
+      throw new SparkUnsupportedOperationException(
+        "_LEGACY_ERROR_TEMP_3189", Map.of("encoding", 
page.getDlEncoding().toString()));
     }
 
     int pageValueCount = page.getValueCount();
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index baefa254466f..ca9d6f3c5db0 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -24,6 +24,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.spark.SparkUnsupportedOperationException;
 import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns;
 import scala.Option;
 import scala.jdk.javaapi.CollectionConverters;
@@ -375,7 +376,7 @@ public class VectorizedParquetRecordReader extends 
SpecificParquetRecordReaderBa
         ColumnDescriptor desc = column.descriptor().get();
         ColumnDescriptor fd = fileSchema.getColumnDescription(desc.getPath());
         if (!fd.equals(desc)) {
-          throw new UnsupportedOperationException("Schema evolution not 
supported.");
+          throw new 
SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3185");
         }
       } else {
         for (ParquetColumn childColumn : 
CollectionConverters.asJava(column.children())) {
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
index fb40a131d2a6..4316e49d5b94 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
@@ -26,6 +26,7 @@ import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.io.ParquetDecodingException;
 
+import org.apache.spark.SparkUnsupportedOperationException;
 import org.apache.spark.sql.catalyst.util.RebaseDateTime;
 import org.apache.spark.sql.execution.datasources.DataSourceUtils;
 import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
@@ -50,7 +51,7 @@ public class VectorizedPlainValuesReader extends ValuesReader 
implements Vectori
 
   @Override
   public void skip() {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   private void updateCurrentByte() {
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedReaderBase.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedReaderBase.java
index b6715f1e7a07..ab8fd9bdb6ff 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedReaderBase.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedReaderBase.java
@@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.parquet;
 
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.io.api.Binary;
+import org.apache.spark.SparkUnsupportedOperationException;
 import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
 
 /**
@@ -28,129 +29,129 @@ public class VectorizedReaderBase extends ValuesReader 
implements VectorizedValu
 
   @Override
   public void skip() {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public byte readByte() {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public short readShort() {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public Binary readBinary(int len) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public void readBooleans(int total, WritableColumnVector c, int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public void readBytes(int total, WritableColumnVector c, int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public void readShorts(int total, WritableColumnVector c, int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public void readIntegers(int total, WritableColumnVector c, int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public void readIntegersWithRebase(int total, WritableColumnVector c, int 
rowId,
       boolean failIfRebase) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public void readUnsignedIntegers(int total, WritableColumnVector c, int 
rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public void readUnsignedLongs(int total, WritableColumnVector c, int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public void readLongs(int total, WritableColumnVector c, int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public void readLongsWithRebase(int total, WritableColumnVector c, int rowId,
       boolean failIfRebase, String timeZone) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public void readFloats(int total, WritableColumnVector c, int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public void readDoubles(int total, WritableColumnVector c, int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public void readBinary(int total, WritableColumnVector c, int rowId) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public void skipBooleans(int total) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public void skipBytes(int total) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public void skipShorts(int total) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public void skipIntegers(int total) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public void skipLongs(int total) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public void skipFloats(int total) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public void skipDoubles(int total) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public void skipBinary(int total) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
   public void skipFixedLenByteArray(int total, int len) {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
 }
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
index 584aaa2d118b..0d380997fd5b 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
@@ -29,6 +29,7 @@ import org.apache.parquet.column.values.bitpacking.Packer;
 import org.apache.parquet.io.ParquetDecodingException;
 import org.apache.parquet.io.api.Binary;
 
+import org.apache.spark.SparkUnsupportedOperationException;
 import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
 
 /**
@@ -709,43 +710,43 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
 
   @Override
   public void readUnsignedIntegers(int total, WritableColumnVector c, int 
rowId) {
-    throw new UnsupportedOperationException("only readInts is valid.");
+    throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3187");
   }
 
   @Override
   public void readUnsignedLongs(int total, WritableColumnVector c, int rowId) {
-    throw new UnsupportedOperationException("only readInts is valid.");
+    throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3187");
   }
 
   @Override
   public void readIntegersWithRebase(
       int total, WritableColumnVector c, int rowId, boolean failIfRebase) {
-    throw new UnsupportedOperationException("only readInts is valid.");
+    throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3187");
   }
 
   @Override
   public byte readByte() {
-    throw new UnsupportedOperationException("only readInts is valid.");
+    throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3187");
   }
 
   @Override
   public short readShort() {
-    throw new UnsupportedOperationException("only readInts is valid.");
+    throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3187");
   }
 
   @Override
   public void readBytes(int total, WritableColumnVector c, int rowId) {
-    throw new UnsupportedOperationException("only readInts is valid.");
+    throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3187");
   }
 
   @Override
   public void readShorts(int total, WritableColumnVector c, int rowId) {
-    throw new UnsupportedOperationException("only readInts is valid.");
+    throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3187");
   }
 
   @Override
   public void readLongs(int total, WritableColumnVector c, int rowId) {
-    throw new UnsupportedOperationException("only readInts is valid.");
+    throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3187");
   }
 
   @Override
@@ -755,12 +756,12 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
       int rowId,
       boolean failIfRebase,
       String timeZone) {
-    throw new UnsupportedOperationException("only readInts is valid.");
+    throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3187");
   }
 
   @Override
   public void readBinary(int total, WritableColumnVector c, int rowId) {
-    throw new UnsupportedOperationException("only readInts is valid.");
+    throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3187");
   }
 
   @Override
@@ -786,17 +787,17 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
 
   @Override
   public void readFloats(int total, WritableColumnVector c, int rowId) {
-    throw new UnsupportedOperationException("only readInts is valid.");
+    throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3187");
   }
 
   @Override
   public void readDoubles(int total, WritableColumnVector c, int rowId) {
-    throw new UnsupportedOperationException("only readInts is valid.");
+    throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3187");
   }
 
   @Override
   public Binary readBinary(int len) {
-    throw new UnsupportedOperationException("only readInts is valid.");
+    throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3187");
   }
 
   @Override
@@ -811,37 +812,37 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
 
   @Override
   public void skipBytes(int total) {
-    throw new UnsupportedOperationException("only skipIntegers is valid");
+    throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3188");
   }
 
   @Override
   public void skipShorts(int total) {
-    throw new UnsupportedOperationException("only skipIntegers is valid");
+    throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3188");
   }
 
   @Override
   public void skipLongs(int total) {
-    throw new UnsupportedOperationException("only skipIntegers is valid");
+    throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3188");
   }
 
   @Override
   public void skipFloats(int total) {
-    throw new UnsupportedOperationException("only skipIntegers is valid");
+    throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3188");
   }
 
   @Override
   public void skipDoubles(int total) {
-    throw new UnsupportedOperationException("only skipIntegers is valid");
+    throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3188");
   }
 
   @Override
   public void skipBinary(int total) {
-    throw new UnsupportedOperationException("only skipIntegers is valid");
+    throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3188");
   }
 
   @Override
   public void skipFixedLenByteArray(int total, int len) {
-    throw new UnsupportedOperationException("only skipIntegers is valid");
+    throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3188");
   }
 
   /**
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
index 29c106651acf..9ff385c995ff 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
@@ -27,6 +27,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.spark.SparkUnsupportedOperationException;
 import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -182,7 +183,8 @@ public class ColumnVectorUtils {
       } else if (t instanceof TimestampNTZType) {
         dst.appendLong(DateTimeUtils.localDateTimeToMicros((LocalDateTime) o));
       } else {
-        throw new UnsupportedOperationException("Type " + t);
+        throw new SparkUnsupportedOperationException(
+          "_LEGACY_ERROR_TEMP_3192", Map.of("dt", t.toString()));
       }
     }
   }
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
index a6b353a2e849..0464fe815989 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
@@ -18,7 +18,9 @@
 package org.apache.spark.sql.execution.vectorized;
 
 import java.math.BigDecimal;
+import java.util.Map;
 
+import org.apache.spark.SparkUnsupportedOperationException;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
 import org.apache.spark.sql.types.*;
@@ -96,7 +98,7 @@ public final class MutableColumnarRow extends InternalRow {
 
   @Override
   public boolean anyNull() {
-    throw new UnsupportedOperationException();
+    throw SparkUnsupportedOperationException.apply();
   }
 
   @Override
@@ -196,7 +198,8 @@ public final class MutableColumnarRow extends InternalRow {
     } else if (dataType instanceof MapType) {
       return getMap(ordinal);
     } else {
-      throw new UnsupportedOperationException("Datatype not supported " + 
dataType);
+      throw new SparkUnsupportedOperationException(
+        "_LEGACY_ERROR_TEMP_3192", Map.of("dt", dataType.toString()));
     }
   }
 
@@ -224,7 +227,8 @@ public final class MutableColumnarRow extends InternalRow {
       } else if (dt instanceof CalendarIntervalType) {
         setInterval(ordinal, (CalendarInterval) value);
       } else {
-        throw new UnsupportedOperationException("Datatype not supported " + 
dt);
+        throw new SparkUnsupportedOperationException(
+          "_LEGACY_ERROR_TEMP_3192", Map.of("dt", dt.toString()));
       }
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala
index 69a5fd860740..47f5dbdac488 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala
@@ -29,7 +29,7 @@ import scala.reflect.ClassTag
 import org.apache.commons.io.{FilenameUtils, FileUtils}
 import org.apache.hadoop.fs.{LocalFileSystem, Path => FSPath}
 
-import org.apache.spark.{JobArtifactSet, JobArtifactState, SparkEnv}
+import org.apache.spark.{JobArtifactSet, JobArtifactState, SparkEnv, 
SparkUnsupportedOperationException}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.{CONNECT_SCALA_UDF_STUB_PREFIXES, 
EXECUTOR_USER_CLASS_PATH_FIRST}
 import org.apache.spark.sql.SparkSession
@@ -275,8 +275,7 @@ class ArtifactManager(session: SparkSession) extends 
Logging {
         // `spark.sql.artifact.copyFromLocalToFs.allowDestLocal`
         // to `true` when starting spark driver, we should only enable it for 
testing
         // purpose.
-        throw new UnsupportedOperationException(
-          "Uploading artifact file to local file system destination path is 
not supported.")
+        throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3161")
       }
     }
     fs.copyFromLocalFile(false, true, new FSPath(localPath.toString), 
destFSPath)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index cbb34d6d484f..60079152781d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
 
 import java.util.Locale
 
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkException, SparkUnsupportedOperationException}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{execution, AnalysisException, Strategy}
 import org.apache.spark.sql.catalyst.InternalRow
@@ -59,7 +59,7 @@ case class PlanLater(plan: LogicalPlan) extends LeafExecNode {
   override def output: Seq[Attribute] = plan.output
 
   protected override def doExecute(): RDD[InternalRow] = {
-    throw new UnsupportedOperationException()
+    throw SparkUnsupportedOperationException()
   }
 }
 
@@ -863,8 +863,7 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
         ) :: Nil
       case _: FlatMapGroupsInPandasWithState =>
         // TODO(SPARK-40443): support applyInPandasWithState in batch query
-        throw new UnsupportedOperationException(
-          "applyInPandasWithState is unsupported in batch query. Use 
applyInPandas instead.")
+        throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3176")
       case logical.CoGroup(
           f, key, lObj, rObj, lGroup, rGroup, lAttr, rAttr, lOrder, rOrder, 
oAttr, left, right) =>
         execution.CoGroupExec(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
index 8ab553369de6..8563bbcd7960 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
@@ -24,6 +24,7 @@ import scala.reflect.ClassTag
 
 import com.google.common.io.ByteStreams
 
+import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.serializer.{DeserializationStream, 
SerializationStream, Serializer, SerializerInstance}
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.execution.metric.SQLMetric
@@ -79,12 +80,12 @@ private class UnsafeRowSerializerInstance(
 
     override def writeAll[T: ClassTag](iter: Iterator[T]): SerializationStream 
= {
       // This method is never called by shuffle code.
-      throw new UnsupportedOperationException
+      throw SparkUnsupportedOperationException()
     }
 
     override def writeObject[T: ClassTag](t: T): SerializationStream = {
       // This method is never called by shuffle code.
-      throw new UnsupportedOperationException
+      throw SparkUnsupportedOperationException()
     }
 
     override def flush(): Unit = {
@@ -145,7 +146,7 @@ private class UnsafeRowSerializerInstance(
 
       override def asIterator: Iterator[Any] = {
         // This method is never called by shuffle code.
-        throw new UnsupportedOperationException
+        throw SparkUnsupportedOperationException()
       }
 
       override def readKey[T: ClassTag](): T = {
@@ -166,7 +167,7 @@ private class UnsafeRowSerializerInstance(
 
       override def readObject[T: ClassTag](): T = {
         // This method is never called by shuffle code.
-        throw new UnsupportedOperationException
+        throw SparkUnsupportedOperationException()
       }
 
       override def close(): Unit = {
@@ -176,9 +177,9 @@ private class UnsafeRowSerializerInstance(
   }
 
   // These methods are never called by shuffle code.
-  override def serialize[T: ClassTag](t: T): ByteBuffer = throw new 
UnsupportedOperationException
+  override def serialize[T: ClassTag](t: T): ByteBuffer = throw 
SparkUnsupportedOperationException()
   override def deserialize[T: ClassTag](bytes: ByteBuffer): T =
-    throw new UnsupportedOperationException
+    throw SparkUnsupportedOperationException()
   override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: 
ClassLoader): T =
-    throw new UnsupportedOperationException
+    throw SparkUnsupportedOperationException()
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index 058df24fc13d..6fbb43a95a07 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
 import scala.collection.mutable
 import scala.util.control.NonFatal
 
-import org.apache.spark.{broadcast, SparkException}
+import org.apache.spark.{broadcast, SparkException, 
SparkUnsupportedOperationException}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
@@ -339,7 +339,7 @@ trait CodegenSupport extends SparkPlan {
    *       different inputs(join build side, aggregate buffer, etc.), or other 
special cases.
    */
   def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): 
String = {
-    throw new UnsupportedOperationException
+    throw SparkUnsupportedOperationException()
   }
 
   /**
@@ -356,7 +356,9 @@ trait CodegenSupport extends SparkPlan {
     } else if (children.length == 1) {
       children.head.asInstanceOf[CodegenSupport].needCopyResult
     } else {
-      throw new UnsupportedOperationException
+      throw new SparkUnsupportedOperationException(
+        errorClass = "_LEGACY_ERROR_TEMP_3163",
+        messageParameters = Map("num" -> children.length.toString))
     }
   }
 
@@ -779,11 +781,11 @@ case class WholeStageCodegenExec(child: SparkPlan)(val 
codegenStageId: Int)
   }
 
   override def inputRDDs(): Seq[RDD[InternalRow]] = {
-    throw new UnsupportedOperationException
+    throw SparkUnsupportedOperationException()
   }
 
   override def doProduce(ctx: CodegenContext): String = {
-    throw new UnsupportedOperationException
+    throw SparkUnsupportedOperationException()
   }
 
   override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: 
ExprCode): String = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
index 37cdea084d8a..5ec56e86d795 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
@@ -21,6 +21,7 @@ import scala.collection.mutable
 
 import org.apache.commons.io.FileUtils
 
+import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
@@ -260,7 +261,7 @@ case class OptimizeSkewedJoin(ensureRequirements: 
EnsureRequirements)
 // caused by skew join optimization. However, this shouldn't apply to the 
sub-plan under skew join,
 // as it's guaranteed to satisfy distribution requirement.
 case class SkewJoinChildWrapper(plan: SparkPlan) extends LeafExecNode {
-  override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
+  override protected def doExecute(): RDD[InternalRow] = throw 
SparkUnsupportedOperationException()
   override def output: Seq[Attribute] = plan.output
   override def outputPartitioning: Partitioning = plan.outputPartitioning
   override def outputOrdering: Seq[SortOrder] = plan.outputOrdering
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
index 6042ff7b2caf..c4ff2454ae67 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.aggregate
 
+import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
@@ -98,11 +99,11 @@ case class SortAggregateExec(
   protected override def needHashTable: Boolean = false
 
   protected override def doProduceWithKeys(ctx: CodegenContext): String = {
-    throw new UnsupportedOperationException("SortAggregate code-gen does not 
support grouping keys")
+    throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3170")
   }
 
   protected override def doConsumeWithKeys(ctx: CodegenContext, input: 
Seq[ExprCode]): String = {
-    throw new UnsupportedOperationException("SortAggregate code-gen does not 
support grouping keys")
+    throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3170")
   }
 
   override def simpleString(maxFields: Int): String = toString(verbose = 
false, maxFields)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
index 33918bcee738..5eadc7d47c92 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.columnar
 
+import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
@@ -51,16 +52,16 @@ class MutableUnsafeRow(val writer: UnsafeRowWriter) extends 
BaseGenericInternalR
 
   // the writer will be used directly to avoid creating wrapper objects
   override def setDecimal(i: Int, v: Decimal, precision: Int): Unit =
-    throw new UnsupportedOperationException
+    throw SparkUnsupportedOperationException()
 
   override def setInterval(i: Int, value: CalendarInterval): Unit =
-    throw new UnsupportedOperationException
+    throw SparkUnsupportedOperationException()
 
-  override def update(i: Int, v: Any): Unit = throw new 
UnsupportedOperationException
+  override def update(i: Int, v: Any): Unit = throw 
SparkUnsupportedOperationException()
 
   // all other methods inherited from GenericMutableRow are not need
-  override protected def genericGet(ordinal: Int): Any = throw new 
UnsupportedOperationException
-  override def numFields: Int = throw new UnsupportedOperationException
+  override protected def genericGet(ordinal: Int): Any = throw 
SparkUnsupportedOperationException()
+  override def numFields: Int = throw SparkUnsupportedOperationException()
 }
 
 /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 51daea76abc5..9fb10f42164f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -27,7 +27,7 @@ import scala.jdk.CollectionConverters._
 import scala.util.Try
 import scala.util.control.NonFatal
 
-import org.apache.spark.{SparkThrowable, TaskContext}
+import org.apache.spark.{SparkThrowable, SparkUnsupportedOperationException, 
TaskContext}
 import org.apache.spark.executor.InputMetrics
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, Row}
@@ -1132,8 +1132,11 @@ object JdbcUtils extends Logging with SQLConfHelper {
           if (containsIndexTypeIgnoreCase(supportedIndexTypeList, v)) {
             indexType = s"USING $v"
           } else {
-            throw new UnsupportedOperationException(s"Index Type $v is not 
supported." +
-              s" The supported Index Types are: 
${supportedIndexTypeList.mkString(" AND ")}")
+            throw new SparkUnsupportedOperationException(
+              errorClass = "_LEGACY_ERROR_TEMP_3175",
+              messageParameters = Map(
+                "v" -> v,
+                "supportedIndexTypeList" -> supportedIndexTypeList.mkString(" 
AND ")))
           }
         } else {
           indexPropertyList.append(s"$k = $v")
@@ -1145,8 +1148,7 @@ object JdbcUtils extends Logging with SQLConfHelper {
 
   def containsIndexTypeIgnoreCase(supportedIndexTypeList: Array[String], 
value: String): Boolean = {
     if (supportedIndexTypeList.isEmpty) {
-      throw new UnsupportedOperationException(
-        "Cannot specify 'USING index_type' in 'CREATE INDEX'")
+      throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3173")
     }
     for (indexType <- supportedIndexTypeList) {
       if (value.equalsIgnoreCase(indexType)) return true
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
index bd6b5bfeb4da..5020bf7333de 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
@@ -32,7 +32,7 @@ import org.apache.parquet.io.api.Binary
 import org.apache.parquet.schema.{PrimitiveType, Types}
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
 
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkException, SparkUnsupportedOperationException}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.SparkSession
@@ -394,8 +394,11 @@ object ParquetUtils extends Logging {
       isMax: Boolean): Any = {
     val statistics = columnChunkMetaData.get(i).getStatistics
     if (!statistics.hasNonNullValue) {
-      throw new UnsupportedOperationException(s"No min/max found for Parquet 
file $filePath. " +
-        s"Set SQLConf ${PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key} to false and 
execute again")
+      throw new SparkUnsupportedOperationException(
+        errorClass = "_LEGACY_ERROR_TEMP_3172",
+        messageParameters = Map(
+          "filePath" -> filePath,
+          "config" -> PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key))
     } else {
       if (isMax) statistics.genericGetMax else statistics.genericGetMin
     }
@@ -407,9 +410,11 @@ object ParquetUtils extends Logging {
       i: Int): Long = {
     val statistics = columnChunkMetaData.get(i).getStatistics
     if (!statistics.isNumNullsSet) {
-      throw new UnsupportedOperationException(s"Number of nulls not set for 
Parquet file" +
-        s" $filePath. Set SQLConf ${PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key} to 
false and execute" +
-        s" again")
+      throw new SparkUnsupportedOperationException(
+        errorClass = "_LEGACY_ERROR_TEMP_3171",
+        messageParameters = Map(
+          "filePath" -> filePath,
+          "config" -> PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key))
     }
     statistics.getNumNulls;
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 16345bb35db2..85c198290542 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -22,7 +22,7 @@ import java.io._
 import com.esotericsoftware.kryo.{Kryo, KryoSerializable}
 import com.esotericsoftware.kryo.io.{Input, Output}
 
-import org.apache.spark.{SparkConf, SparkEnv, SparkException}
+import org.apache.spark.{SparkConf, SparkEnv, SparkException, 
SparkUnsupportedOperationException}
 import org.apache.spark.internal.config.{BUFFER_PAGESIZE, 
MEMORY_OFFHEAP_ENABLED}
 import org.apache.spark.memory._
 import org.apache.spark.sql.catalyst.InternalRow
@@ -52,7 +52,7 @@ private[execution] sealed trait HashedRelation extends 
KnownSizeEstimation {
    * Returns null if there is no matched rows.
    */
   def get(key: Long): Iterator[InternalRow] = {
-    throw new UnsupportedOperationException
+    throw SparkUnsupportedOperationException()
   }
 
   /**
@@ -64,7 +64,7 @@ private[execution] sealed trait HashedRelation extends 
KnownSizeEstimation {
    * Returns the matched single row with key that have only one column of 
LongType.
    */
   def getValue(key: Long): InternalRow = {
-    throw new UnsupportedOperationException
+    throw SparkUnsupportedOperationException()
   }
 
   /**
@@ -73,7 +73,7 @@ private[execution] sealed trait HashedRelation extends 
KnownSizeEstimation {
    * Returns null if there is no matched rows.
    */
   def getWithKeyIndex(key: InternalRow): Iterator[ValueRowWithKeyIndex] = {
-    throw new UnsupportedOperationException
+    throw SparkUnsupportedOperationException()
   }
 
   /**
@@ -83,21 +83,21 @@ private[execution] sealed trait HashedRelation extends 
KnownSizeEstimation {
    * Returns null if there is no matched rows.
    */
   def getValueWithKeyIndex(key: InternalRow): ValueRowWithKeyIndex = {
-    throw new UnsupportedOperationException
+    throw SparkUnsupportedOperationException()
   }
 
   /**
    * Returns an iterator for keys index and rows of InternalRow type.
    */
   def valuesWithKeyIndex(): Iterator[ValueRowWithKeyIndex] = {
-    throw new UnsupportedOperationException
+    throw SparkUnsupportedOperationException()
   }
 
   /**
    * Returns the maximum number of allowed keys index.
    */
   def maxNumKeysIndex: Int = {
-    throw new UnsupportedOperationException
+    throw SparkUnsupportedOperationException()
   }
 
   /**
@@ -1043,19 +1043,19 @@ class LongHashedRelation(
   override def keys(): Iterator[InternalRow] = map.keys()
 
   override def getWithKeyIndex(key: InternalRow): 
Iterator[ValueRowWithKeyIndex] = {
-    throw new UnsupportedOperationException
+    throw SparkUnsupportedOperationException()
   }
 
   override def getValueWithKeyIndex(key: InternalRow): ValueRowWithKeyIndex = {
-    throw new UnsupportedOperationException
+    throw SparkUnsupportedOperationException()
   }
 
   override def valuesWithKeyIndex(): Iterator[ValueRowWithKeyIndex] = {
-    throw new UnsupportedOperationException
+    throw SparkUnsupportedOperationException()
   }
 
   override def maxNumKeysIndex: Int = {
-    throw new UnsupportedOperationException
+    throw SparkUnsupportedOperationException()
   }
 }
 
@@ -1126,11 +1126,11 @@ case object EmptyHashedRelation extends HashedRelation {
  */
 case object HashedRelationWithAllNullKeys extends HashedRelation {
   override def get(key: InternalRow): Iterator[InternalRow] = {
-    throw new UnsupportedOperationException
+    throw SparkUnsupportedOperationException()
   }
 
   override def getValue(key: InternalRow): InternalRow = {
-    throw new UnsupportedOperationException
+    throw SparkUnsupportedOperationException()
   }
 
   override def asReadOnlyCopy(): HashedRelationWithAllNullKeys.type = this
@@ -1138,7 +1138,7 @@ case object HashedRelationWithAllNullKeys extends 
HashedRelation {
   override def keyIsUnique: Boolean = true
 
   override def keys(): Iterator[InternalRow] = {
-    throw new UnsupportedOperationException
+    throw SparkUnsupportedOperationException()
   }
 
   override def close(): Unit = {}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasWithStateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasWithStateExec.scala
index 850ee016e363..eef0b3e3e846 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasWithStateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasWithStateExec.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.sql.execution.python
 
-import org.apache.spark.{JobArtifactSet, SparkException, TaskContext}
+import org.apache.spark.{JobArtifactSet, SparkException, 
SparkUnsupportedOperationException, TaskContext}
 import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.InternalRow
@@ -138,7 +138,7 @@ case class FlatMapGroupsInPandasWithStateExec(
     override def processNewDataWithInitialState(
         childDataIter: Iterator[InternalRow],
         initStateIter: Iterator[InternalRow]): Iterator[InternalRow] = {
-      throw new UnsupportedOperationException("Should not reach here!")
+      throw SparkUnsupportedOperationException()
     }
 
     override def processTimedOutState(): Iterator[InternalRow] = {
@@ -232,7 +232,7 @@ case class FlatMapGroupsInPandasWithStateExec(
         stateData: StateData,
         valueRowIter: Iterator[InternalRow],
         hasTimedOut: Boolean): Iterator[InternalRow] = {
-      throw new UnsupportedOperationException("Should not reach here!")
+      throw SparkUnsupportedOperationException()
     }
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AcceptsLatestSeenOffsetHandler.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AcceptsLatestSeenOffsetHandler.scala
index 69795cc82c47..bf4333d8e9dd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AcceptsLatestSeenOffsetHandler.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AcceptsLatestSeenOffsetHandler.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.sql.connector.read.streaming.{AcceptsLatestSeenOffset, 
SparkDataStream}
 
 /**
@@ -47,9 +48,9 @@ object AcceptsLatestSeenOffsetHandler {
       .filter(_.isInstanceOf[Source])
 
     if (unsupportedSources.nonEmpty) {
-      throw new UnsupportedOperationException(
-        "AcceptsLatestSeenOffset is not supported with DSv1 streaming source: 
" +
-          unsupportedSources)
+      throw new SparkUnsupportedOperationException(
+        errorClass = "_LEGACY_ERROR_TEMP_3169",
+        messageParameters = Map("unsupportedSources" -> 
unsupportedSources.toString()))
     }
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowMicroBatchStreamWrapper.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowMicroBatchStreamWrapper.scala
index f60468c85e6e..3df358ae9bf8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowMicroBatchStreamWrapper.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowMicroBatchStreamWrapper.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.sql.connector.read.{InputPartition, 
PartitionReaderFactory}
 import org.apache.spark.sql.connector.read.streaming
 import org.apache.spark.sql.connector.read.streaming.MicroBatchStream
@@ -29,8 +30,8 @@ import 
org.apache.spark.sql.connector.read.streaming.MicroBatchStream
 class AvailableNowMicroBatchStreamWrapper(delegate: MicroBatchStream)
   extends AvailableNowDataStreamWrapper(delegate) with MicroBatchStream {
 
-  override def latestOffset(): streaming.Offset = throw new 
UnsupportedOperationException(
-    "latestOffset(Offset, ReadLimit) should be called instead of this method")
+  override def latestOffset(): streaming.Offset =
+    throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3166")
 
   override def planInputPartitions(start: streaming.Offset, end: 
streaming.Offset):
   Array[InputPartition] = delegate.planInputPartitions(start, end)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowSourceWrapper.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowSourceWrapper.scala
index e971b1737ccc..af5713144ebb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowSourceWrapper.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowSourceWrapper.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.types.StructType
 
@@ -30,8 +31,8 @@ class AvailableNowSourceWrapper(delegate: Source)
 
   override def schema: StructType = delegate.schema
 
-  override def getOffset: Option[Offset] = throw new 
UnsupportedOperationException(
-    "latestOffset(Offset, ReadLimit) should be called instead of this method")
+  override def getOffset: Option[Offset] =
+    throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3166")
 
   override def getBatch(start: Option[Offset], end: Offset): DataFrame =
     delegate.getBatch(start, end)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
index c9ade7b568e8..cb283699b4e3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit
 import org.json4s._
 import org.json4s.jackson.JsonMethods._
 
+import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.api.java.Optional
 import org.apache.spark.sql.catalyst.plans.logical.{EventTimeTimeout, 
NoTimeout, ProcessingTimeTimeout}
 import org.apache.spark.sql.catalyst.util.IntervalUtils
@@ -217,8 +218,7 @@ private[sql] object GroupStateImpl {
       throw new IllegalArgumentException("eventTimeWatermarkMs must be 0 or 
positive if present")
     }
     if (hasTimedOut && timeoutConf == NoTimeout) {
-      throw new UnsupportedOperationException(
-        "hasTimedOut is true however there's no timeout configured")
+      throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3168")
     }
 
     new GroupStateImpl[S](
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala
index 17cc1860fbdc..c518f6fe7bae 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming.sources
 
 import java.util
 
+import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, 
TableCapability}
 import org.apache.spark.sql.connector.read.{Scan, ScanBuilder}
@@ -109,7 +110,7 @@ class RatePerMicroBatchTable(
         advanceMillisPerBatch, options)
 
     override def toContinuousStream(checkpointLocation: String): 
ContinuousStream = {
-      throw new UnsupportedOperationException("continuous mode is not 
supported!")
+      throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3167")
     }
 
     override def columnarSupportMode(): Scan.ColumnarSupportMode =
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchStream.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchStream.scala
index 8dca7d40704a..d51f87cb1a57 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchStream.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchStream.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.streaming.sources
 import org.json4s.{Formats, NoTypeHints}
 import org.json4s.jackson.Serialization
 
+import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
@@ -38,8 +39,7 @@ class RatePerMicroBatchStream(
   override def initialOffset(): Offset = RatePerMicroBatchStreamOffset(0L, 
startTimestamp)
 
   override def latestOffset(): Offset = {
-    throw new UnsupportedOperationException(
-      "latestOffset(Offset, ReadLimit) should be called instead of this 
method")
+    throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3166")
   }
 
   override def getDefaultReadLimit: ReadLimit = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreMap.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreMap.scala
index 59b01b5b454d..32ff87f754d7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreMap.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreMap.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.streaming.state
 import scala.collection.mutable
 import scala.jdk.CollectionConverters._
 
+import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.sql.catalyst.expressions.{BoundReference, 
UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.types.{StructField, StructType}
 
@@ -77,7 +78,7 @@ class NoPrefixHDFSBackedStateStoreMap extends 
HDFSBackedStateStoreMap {
   }
 
   override def prefixScan(prefixKey: UnsafeRow): Iterator[UnsafeRowPair] = {
-    throw new UnsupportedOperationException("Prefix scan is not supported!")
+    throw SparkUnsupportedOperationException()
   }
 
   override def clear(): Unit = map.clear()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
index 4aa7444c407e..44d18b44e4ef 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.window
 
 import java.util
 
+import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences
@@ -206,9 +207,9 @@ abstract class OffsetWindowFunctionFrameBase(
     inputIndex = offset
   }
 
-  override def currentLowerBound(): Int = throw new 
UnsupportedOperationException()
+  override def currentLowerBound(): Int = throw 
SparkUnsupportedOperationException()
 
-  override def currentUpperBound(): Int = throw new 
UnsupportedOperationException()
+  override def currentUpperBound(): Int = throw 
SparkUnsupportedOperationException()
 }
 
 /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
index f745e466ed9e..62c31b1c4c5d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
@@ -22,6 +22,7 @@ import java.util.Locale
 
 import scala.util.control.NonFatal
 
+import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException
 import org.apache.spark.sql.connector.catalog.Identifier
@@ -49,8 +50,11 @@ private object DB2Dialect extends JdbcDialect {
     override def visitAggregateFunction(
         funcName: String, isDistinct: Boolean, inputs: Array[String]): String =
       if (isDistinct && 
distinctUnsupportedAggregateFunctions.contains(funcName)) {
-        throw new 
UnsupportedOperationException(s"${this.getClass.getSimpleName} does not " +
-          s"support aggregate function: $funcName with DISTINCT");
+        throw new SparkUnsupportedOperationException(
+          errorClass = "_LEGACY_ERROR_TEMP_3184",
+          messageParameters = Map(
+            "class" -> this.getClass.getSimpleName,
+            "funcName" -> funcName))
       } else {
         super.visitAggregateFunction(funcName, isDistinct, inputs)
       }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
index cd151f790adf..74eca7e48577 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
@@ -27,6 +27,7 @@ import scala.util.control.NonFatal
 
 import org.apache.commons.lang3.StringUtils
 
+import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, 
NoSuchIndexException, NoSuchNamespaceException, NoSuchTableException, 
TableAlreadyExistsException}
 import org.apache.spark.sql.connector.catalog.Identifier
@@ -268,8 +269,11 @@ private[sql] object H2Dialect extends JdbcDialect {
     override def visitAggregateFunction(
         funcName: String, isDistinct: Boolean, inputs: Array[String]): String =
       if (isDistinct && 
distinctUnsupportedAggregateFunctions.contains(funcName)) {
-        throw new 
UnsupportedOperationException(s"${this.getClass.getSimpleName} does not " +
-          s"support aggregate function: $funcName with DISTINCT")
+        throw new SparkUnsupportedOperationException(
+          errorClass = "_LEGACY_ERROR_TEMP_3184",
+          messageParameters = Map(
+            "class" -> this.getClass.getSimpleName,
+            "funcName" -> funcName))
       } else {
         super.visitAggregateFunction(funcName, isDistinct, inputs)
       }
@@ -296,8 +300,11 @@ private[sql] object H2Dialect extends JdbcDialect {
           case _ => super.visitSQLFunction(funcName, inputs)
         }
       } else {
-        throw new UnsupportedOperationException(
-          s"${this.getClass.getSimpleName} does not support function: 
$funcName");
+        throw new SparkUnsupportedOperationException(
+          errorClass = "_LEGACY_ERROR_TEMP_3177",
+          messageParameters = Map(
+            "class" -> this.getClass.getSimpleName,
+            "funcName" -> funcName))
       }
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index bee870fcf7b7..4052f5544f80 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -26,6 +26,7 @@ import scala.util.control.NonFatal
 
 import org.apache.commons.lang3.StringUtils
 
+import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.annotation.{DeveloperApi, Since}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
@@ -325,8 +326,11 @@ abstract class JdbcDialect extends Serializable with 
Logging {
       } else {
         // The framework will catch the error and give up the push-down.
         // Please see `JdbcDialect.compileExpression(expr: Expression)` for 
more details.
-        throw new UnsupportedOperationException(
-          s"${this.getClass.getSimpleName} does not support function: 
$funcName")
+        throw new SparkUnsupportedOperationException(
+          errorClass = "_LEGACY_ERROR_TEMP_3177",
+          messageParameters = Map(
+            "class" -> this.getClass.getSimpleName,
+            "funcName" -> funcName))
       }
     }
 
@@ -335,8 +339,11 @@ abstract class JdbcDialect extends Serializable with 
Logging {
       if (isSupportedFunction(funcName)) {
         super.visitAggregateFunction(dialectFunctionName(funcName), 
isDistinct, inputs)
       } else {
-        throw new UnsupportedOperationException(
-          s"${this.getClass.getSimpleName} does not support aggregate 
function: $funcName")
+        throw new SparkUnsupportedOperationException(
+          errorClass = "_LEGACY_ERROR_TEMP_3177",
+          messageParameters = Map(
+            "class" -> this.getClass.getSimpleName,
+            "funcName" -> funcName))
       }
     }
 
@@ -349,9 +356,11 @@ abstract class JdbcDialect extends Serializable with 
Logging {
         super.visitInverseDistributionFunction(
           dialectFunctionName(funcName), isDistinct, inputs, 
orderingWithinGroups)
       } else {
-        throw new UnsupportedOperationException(
-          s"${this.getClass.getSimpleName} does not support " +
-            s"inverse distribution function: $funcName")
+        throw new SparkUnsupportedOperationException(
+          errorClass = "_LEGACY_ERROR_TEMP_3178",
+          messageParameters = Map(
+            "class" -> this.getClass.getSimpleName,
+            "funcName" -> funcName))
       }
     }
 
@@ -361,8 +370,11 @@ abstract class JdbcDialect extends Serializable with 
Logging {
       if (isSupportedFunction("OVERLAY")) {
         super.visitOverlay(inputs)
       } else {
-        throw new UnsupportedOperationException(
-          s"${this.getClass.getSimpleName} does not support function: OVERLAY")
+        throw new SparkUnsupportedOperationException(
+          errorClass = "_LEGACY_ERROR_TEMP_3177",
+          messageParameters = Map(
+            "class" -> this.getClass.getSimpleName,
+            "funcName" -> "OVERLAY"))
       }
     }
 
@@ -370,8 +382,11 @@ abstract class JdbcDialect extends Serializable with 
Logging {
       if (isSupportedFunction("TRIM")) {
         super.visitTrim(direction, inputs)
       } else {
-        throw new UnsupportedOperationException(
-          s"${this.getClass.getSimpleName} does not support function: TRIM")
+        throw new SparkUnsupportedOperationException(
+          errorClass = "_LEGACY_ERROR_TEMP_3177",
+          messageParameters = Map(
+            "class" -> this.getClass.getSimpleName,
+            "funcName" -> "TRIM"))
       }
     }
   }
@@ -602,7 +617,7 @@ abstract class JdbcDialect extends Serializable with 
Logging {
       columns: Array[NamedReference],
       columnsProperties: util.Map[NamedReference, util.Map[String, String]],
       properties: util.Map[String, String]): String = {
-    throw new UnsupportedOperationException("createIndex is not supported")
+    throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3179")
   }
 
   /**
@@ -619,7 +634,7 @@ abstract class JdbcDialect extends Serializable with 
Logging {
       indexName: String,
       tableIdent: Identifier,
       options: JDBCOptions): Boolean = {
-    throw new UnsupportedOperationException("indexExists is not supported")
+    throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3180")
   }
 
   /**
@@ -630,7 +645,7 @@ abstract class JdbcDialect extends Serializable with 
Logging {
    * @return the SQL statement to use for dropping the index.
    */
   def dropIndex(indexName: String, tableIdent: Identifier): String = {
-    throw new UnsupportedOperationException("dropIndex is not supported")
+    throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3181")
   }
 
   /**
@@ -640,7 +655,7 @@ abstract class JdbcDialect extends Serializable with 
Logging {
       conn: Connection,
       tableIdent: Identifier,
       options: JDBCOptions): Array[TableIndex] = {
-    throw new UnsupportedOperationException("listIndexes is not supported")
+    throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3182")
   }
 
   /**
@@ -716,7 +731,7 @@ abstract class JdbcDialect extends Serializable with 
Logging {
   def supportsTableSample: Boolean = false
 
   def getTableSample(sample: TableSampleInfo): String =
-    throw new UnsupportedOperationException("TableSample is not supported by 
this data source")
+    throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3183")
 
   /**
    * Return the DB-specific quoted and fully qualified table name
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
index cbed1d1e6384..5a434a935e96 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
@@ -24,6 +24,7 @@ import java.util.Locale
 import scala.collection.mutable.ArrayBuilder
 import scala.util.control.NonFatal
 
+import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, 
NoSuchIndexException}
@@ -68,8 +69,11 @@ private case object MySQLDialect extends JdbcDialect with 
SQLConfHelper {
     override def visitAggregateFunction(
         funcName: String, isDistinct: Boolean, inputs: Array[String]): String =
       if (isDistinct && 
distinctUnsupportedAggregateFunctions.contains(funcName)) {
-        throw new 
UnsupportedOperationException(s"${this.getClass.getSimpleName} does not " +
-          s"support aggregate function: $funcName with DISTINCT");
+        throw new SparkUnsupportedOperationException(
+          errorClass = "_LEGACY_ERROR_TEMP_3184",
+          messageParameters = Map(
+            "class" -> this.getClass.getSimpleName,
+            "funcName" -> funcName))
       } else {
         super.visitAggregateFunction(funcName, isDistinct, inputs)
       }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
index b6c98eedc16d..6d4dc060b636 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
@@ -22,6 +22,7 @@ import java.util.{Locale, TimeZone}
 
 import scala.util.control.NonFatal
 
+import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.connector.expressions.Expression
 import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
@@ -55,8 +56,11 @@ private case object OracleDialect extends JdbcDialect {
     override def visitAggregateFunction(
         funcName: String, isDistinct: Boolean, inputs: Array[String]): String =
       if (isDistinct && 
distinctUnsupportedAggregateFunctions.contains(funcName)) {
-        throw new 
UnsupportedOperationException(s"${this.getClass.getSimpleName} does not " +
-          s"support aggregate function: $funcName with DISTINCT");
+        throw new SparkUnsupportedOperationException(
+          errorClass = "_LEGACY_ERROR_TEMP_3184",
+          messageParameters = Map(
+            "class" -> this.getClass.getSimpleName,
+            "funcName" -> funcName))
       } else {
         super.visitAggregateFunction(funcName, isDistinct, inputs)
       }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/util/MapperRowCounter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/util/MapperRowCounter.scala
index 7e1dfacca4a2..eb6a5a2a6f63 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/util/MapperRowCounter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/util/MapperRowCounter.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.util
 
 import java.{lang => jl}
 
+import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.util.AccumulatorV2
 
 /**
@@ -77,8 +78,11 @@ class MapperRowCounter extends AccumulatorV2[jl.Long, 
java.util.List[(jl.Integer
     case o: MapperRowCounter =>
       this.synchronized(getOrCreate.addAll(o.value))
     case _ =>
-      throw new UnsupportedOperationException(
-        s"Cannot merge ${this.getClass.getName} with 
${other.getClass.getName}")
+      throw new SparkUnsupportedOperationException(
+        errorClass = "_LEGACY_ERROR_TEMP_3165",
+        messageParameters = Map(
+          "classA" -> this.getClass.getName,
+          "classB" -> other.getClass.getName))
   }
 
   override def value: java.util.List[(jl.Integer, jl.Long)] = 
this.synchronized(getOrCreate)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index f0f48026a4a0..78776b7efbca 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -29,7 +29,7 @@ import org.scalatest.Assertions._
 import org.scalatest.exceptions.TestFailedException
 import org.scalatest.prop.TableDrivenPropertyChecks._
 
-import org.apache.spark.{SparkConf, SparkException, TaskContext}
+import org.apache.spark.{SparkConf, SparkException, 
SparkUnsupportedOperationException, TaskContext}
 import org.apache.spark.TestUtils.withListener
 import org.apache.spark.internal.config.MAX_RESULT_SIZE
 import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
@@ -2797,11 +2797,11 @@ case class DataSeqOptSeq(a: Seq[Option[Seq[Int]]])
  */
 case class NonSerializableCaseClass(value: String) extends Externalizable {
   override def readExternal(in: ObjectInput): Unit = {
-    throw new UnsupportedOperationException
+    throw SparkUnsupportedOperationException()
   }
 
   override def writeExternal(out: ObjectOutput): Unit = {
-    throw new UnsupportedOperationException
+    throw SparkUnsupportedOperationException()
   }
 }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 03b8ca32f561..6692f469cf42 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{LocalFileSystem, Path}
 
-import org.apache.spark.{SparkException, SparkFileNotFoundException, 
SparkRuntimeException}
+import org.apache.spark.{SparkException, SparkFileNotFoundException, 
SparkRuntimeException, SparkUnsupportedOperationException}
 import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
 import org.apache.spark.sql.TestingUDT.{IntervalUDT, NullData, NullUDT}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
GreaterThan, Literal}
@@ -1257,9 +1257,9 @@ object TestingUDT {
 
     override def sqlType: DataType = CalendarIntervalType
     override def serialize(obj: IntervalData): Any =
-      throw new UnsupportedOperationException("Not implemented")
+      throw SparkUnsupportedOperationException()
     override def deserialize(datum: Any): IntervalData =
-      throw new UnsupportedOperationException("Not implemented")
+      throw SparkUnsupportedOperationException()
     override def userClass: Class[IntervalData] = classOf[IntervalData]
   }
 
@@ -1270,9 +1270,9 @@ object TestingUDT {
 
     override def sqlType: DataType = NullType
     override def serialize(obj: NullData): Any =
-      throw new UnsupportedOperationException("Not implemented")
+      throw SparkUnsupportedOperationException()
     override def deserialize(datum: Any): NullData =
-      throw new UnsupportedOperationException("Not implemented")
+      throw SparkUnsupportedOperationException()
     override def userClass: Class[NullData] = classOf[NullData]
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala
index f896997b57c3..141581e75884 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala
@@ -24,6 +24,7 @@ import 
test.org.apache.spark.sql.connector.catalog.functions.JavaLongAdd._
 import test.org.apache.spark.sql.connector.catalog.functions.JavaRandomAdd._
 import test.org.apache.spark.sql.connector.catalog.functions.JavaStrLen._
 
+import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
 import org.apache.spark.sql.catalyst.InternalRow
 import 
org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode.{FALLBACK, 
NO_CODEGEN}
@@ -807,7 +808,7 @@ class DataSourceV2FunctionSuite extends DatasourceV2SQLBase 
{
     override def description(): String = name()
 
     override def bind(inputType: StructType): BoundFunction = {
-      throw new UnsupportedOperationException(s"Not implemented")
+      throw SparkUnsupportedOperationException()
     }
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
index fbcbf287b455..a7fb2c054e80 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
@@ -1302,7 +1302,7 @@ object ColumnarReaderFactory extends 
PartitionReaderFactory {
   override def supportColumnarReads(partition: InputPartition): Boolean = true
 
   override def createReader(partition: InputPartition): 
PartitionReader[InternalRow] = {
-    throw new UnsupportedOperationException
+    throw SparkUnsupportedOperationException()
   }
 
   override def createColumnarReader(partition: InputPartition): 
PartitionReader[ColumnarBatch] = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/TableCapabilityCheckSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/TableCapabilityCheckSuite.scala
index d7a8225a7d08..6a3d6054301e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/TableCapabilityCheckSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/TableCapabilityCheckSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.connector
 
+import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext}
 import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, NamedRelation}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
Literal}
@@ -239,10 +240,10 @@ private class TestStreamSourceProvider extends 
StreamSourceProvider {
     new Source {
       override def schema: StructType = TableCapabilityCheckSuite.schema
       override def getOffset: Option[Offset] = {
-        throw new UnsupportedOperationException
+        throw SparkUnsupportedOperationException()
       }
       override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
-        throw new UnsupportedOperationException
+        throw SparkUnsupportedOperationException()
       }
       override def stop(): Unit = {}
     }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1ReadFallbackSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1ReadFallbackSuite.scala
index c5be222645b1..b876240ddc37 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1ReadFallbackSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1ReadFallbackSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.connector
 
+import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, QueryTest, Row, SparkSession, 
SQLContext}
 import org.apache.spark.sql.connector.catalog.{BasicInMemoryTableCatalog, 
Identifier, SupportsRead, Table, TableCapability}
@@ -105,7 +106,7 @@ class V1ReadFallbackCatalog extends 
BasicInMemoryTableCatalog {
       properties: java.util.Map[String, String]): Table = {
     // To simplify the test implementation, only support fixed schema.
     if (schema != V1ReadFallbackCatalog.schema || partitions.nonEmpty) {
-      throw new UnsupportedOperationException
+      throw SparkUnsupportedOperationException()
     }
     val table = new TableWithV1ReadFallback(ident.toString)
     tables.put(ident, table)
@@ -188,7 +189,7 @@ class V1TableScan(
     } else if (requiredSchema.map(_.name) == Seq("j")) {
       data.map(row => Row(row.getInt(1)))
     } else {
-      throw new UnsupportedOperationException
+      throw SparkUnsupportedOperationException()
     }
 
     SparkSession.active.sparkContext.makeRDD(result)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ColumnarRulesSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ColumnarRulesSuite.scala
index 75223a779d22..e412cc7f776d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ColumnarRulesSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ColumnarRulesSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution
 
+import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -53,12 +54,12 @@ class ColumnarRulesSuite extends PlanTest with 
SharedSparkSession {
 }
 
 case class LeafOp(override val supportsColumnar: Boolean) extends LeafExecNode 
{
-  override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
+  override protected def doExecute(): RDD[InternalRow] = throw 
SparkUnsupportedOperationException()
   override def output: Seq[Attribute] = Seq.empty
 }
 
 case class UnaryOp(child: SparkPlan, override val supportsColumnar: Boolean) 
extends UnaryExecNode {
-  override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
+  override protected def doExecute(): RDD[InternalRow] = throw 
SparkUnsupportedOperationException()
   override def output: Seq[Attribute] = child.output
   override protected def withNewChildInternal(newChild: SparkPlan): UnaryOp = 
copy(child = newChild)
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index be532ed9097c..15de4c5cc5b2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution
 
+import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{execution, DataFrame, Row}
 import org.apache.spark.sql.catalyst.InternalRow
@@ -1397,7 +1398,7 @@ private case class DummySparkPlan(
     override val requiredChildDistribution: Seq[Distribution] = Nil,
     override val requiredChildOrdering: Seq[Seq[SortOrder]] = Nil
   ) extends SparkPlan {
-  override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException
+  override protected def doExecute(): RDD[InternalRow] = throw 
SparkUnsupportedOperationException()
   override def output: Seq[Attribute] = Seq.empty
   override protected def withNewChildrenInternal(newChildren: 
IndexedSeq[SparkPlan]): SparkPlan =
     copy(children = newChildren)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
index 058719f265d0..966f4e747122 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution
 
-import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.{SparkEnv, SparkException, 
SparkUnsupportedOperationException}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.QueryTest
 import org.apache.spark.sql.catalyst.InternalRow
@@ -152,7 +152,7 @@ case class ColumnarOp(child: SparkPlan) extends 
UnaryExecNode {
   override val supportsColumnar: Boolean = true
   override protected def doExecuteColumnar(): RDD[ColumnarBatch] =
     RowToColumnarExec(child).executeColumnar()
-  override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
+  override protected def doExecute(): RDD[InternalRow] = throw 
SparkUnsupportedOperationException()
   override def output: Seq[Attribute] = child.output
   override protected def withNewChildInternal(newChild: SparkPlan): ColumnarOp 
=
     copy(child = newChild)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarDataTypeUtils.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarDataTypeUtils.scala
index 050c4ede7cf3..4c1429e90b95 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarDataTypeUtils.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarDataTypeUtils.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.spark.sql.execution.columnar
 
+import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.sql.catalyst.types._
 import org.apache.spark.sql.types._
 
@@ -37,6 +38,8 @@ object ColumnarDataTypeUtils {
     case PhysicalStructType(fields) => StructType(fields)
     case PhysicalMapType(keyType, valueType, valueContainsNull) =>
       MapType(keyType, valueType, valueContainsNull)
-    case _ => throw new UnsupportedOperationException()
+    case unsupportedType => throw new SparkUnsupportedOperationException(
+      errorClass = "_LEGACY_ERROR_TEMP_3162",
+      messageParameters = Map("type" -> unsupportedType.toString))
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to