This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new fa60a7e216e6 [SPARK-46795][SQL] Replace `UnsupportedOperationException` by `SparkUnsupportedOperationException` in `sql/core` fa60a7e216e6 is described below commit fa60a7e216e63b1edb199b1610b26197815c656b Author: Max Gekk <max.g...@gmail.com> AuthorDate: Tue Jan 23 20:42:16 2024 +0300 [SPARK-46795][SQL] Replace `UnsupportedOperationException` by `SparkUnsupportedOperationException` in `sql/core` ### What changes were proposed in this pull request? In the PR, I propose to replace all `UnsupportedOperationException` by `SparkUnsupportedOperationException` in `sql/core` code base, and introduce new legacy error classes with the `_LEGACY_ERROR_TEMP_` prefix. ### Why are the changes needed? To unify Spark SQL exception, and port Java exceptions on Spark exceptions with error classes. ### Does this PR introduce _any_ user-facing change? Yes, it can if user's code assumes some particular format of `UnsupportedOperationException` messages. ### How was this patch tested? By running the modified test suites: ``` $ build/sbt "core/testOnly *SparkThrowableSuite" $ build/sbt "test:testOnly *FileBasedDataSourceSuite" $ build/sbt "test:testOnly *ColumnarRulesSuite" ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44772 from MaxGekk/migrate-UnsupportedOperationException-sql. Authored-by: Max Gekk <max.g...@gmail.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../src/main/resources/error/error-classes.json | 150 +++++++++++++++++++++ .../sql/execution/columnar/ColumnDictionary.java | 3 +- .../datasources/orc/OrcArrayColumnVector.java | 25 ++-- .../datasources/orc/OrcAtomicColumnVector.java | 9 +- .../datasources/orc/OrcMapColumnVector.java | 25 ++-- .../datasources/orc/OrcStructColumnVector.java | 25 ++-- .../parquet/ParquetVectorUpdaterFactory.java | 3 +- .../parquet/VectorizedColumnReader.java | 16 ++- .../parquet/VectorizedParquetRecordReader.java | 3 +- .../parquet/VectorizedPlainValuesReader.java | 3 +- .../datasources/parquet/VectorizedReaderBase.java | 51 +++---- .../parquet/VectorizedRleValuesReader.java | 41 +++--- .../execution/vectorized/ColumnVectorUtils.java | 4 +- .../execution/vectorized/MutableColumnarRow.java | 10 +- .../spark/sql/artifact/ArtifactManager.scala | 5 +- .../spark/sql/execution/SparkStrategies.scala | 7 +- .../spark/sql/execution/UnsafeRowSerializer.scala | 15 ++- .../sql/execution/WholeStageCodegenExec.scala | 12 +- .../execution/adaptive/OptimizeSkewedJoin.scala | 3 +- .../execution/aggregate/SortAggregateExec.scala | 5 +- .../columnar/GenerateColumnAccessor.scala | 11 +- .../sql/execution/datasources/jdbc/JdbcUtils.scala | 12 +- .../datasources/parquet/ParquetUtils.scala | 17 ++- .../spark/sql/execution/joins/HashedRelation.scala | 28 ++-- .../FlatMapGroupsInPandasWithStateExec.scala | 6 +- .../streaming/AcceptsLatestSeenOffsetHandler.scala | 7 +- .../AvailableNowMicroBatchStreamWrapper.scala | 5 +- .../streaming/AvailableNowSourceWrapper.scala | 5 +- .../sql/execution/streaming/GroupStateImpl.scala | 4 +- .../sources/RatePerMicroBatchProvider.scala | 3 +- .../sources/RatePerMicroBatchStream.scala | 4 +- .../streaming/state/HDFSBackedStateStoreMap.scala | 3 +- .../sql/execution/window/WindowFunctionFrame.scala | 5 +- .../org/apache/spark/sql/jdbc/DB2Dialect.scala | 8 +- .../org/apache/spark/sql/jdbc/H2Dialect.scala | 15 ++- .../org/apache/spark/sql/jdbc/JdbcDialects.scala | 47 ++++--- .../org/apache/spark/sql/jdbc/MySQLDialect.scala | 8 +- .../org/apache/spark/sql/jdbc/OracleDialect.scala | 8 +- .../apache/spark/sql/util/MapperRowCounter.scala | 8 +- .../scala/org/apache/spark/sql/DatasetSuite.scala | 6 +- .../spark/sql/FileBasedDataSourceSuite.scala | 10 +- .../sql/connector/DataSourceV2FunctionSuite.scala | 3 +- .../spark/sql/connector/DataSourceV2Suite.scala | 2 +- .../sql/connector/TableCapabilityCheckSuite.scala | 5 +- .../spark/sql/connector/V1ReadFallbackSuite.scala | 5 +- .../spark/sql/execution/ColumnarRulesSuite.scala | 5 +- .../apache/spark/sql/execution/PlannerSuite.scala | 3 +- .../spark/sql/execution/SparkPlanSuite.scala | 4 +- .../execution/columnar/ColumnarDataTypeUtils.scala | 5 +- 49 files changed, 451 insertions(+), 216 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 8f4e04ba5456..6088300f8e64 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -7297,6 +7297,156 @@ "" ] }, + "_LEGACY_ERROR_TEMP_3161" : { + "message" : [ + "Uploading artifact file to local file system destination path is not supported." + ] + }, + "_LEGACY_ERROR_TEMP_3162" : { + "message" : [ + "Unsupported physical type <type>." + ] + }, + "_LEGACY_ERROR_TEMP_3163" : { + "message" : [ + "Unsupported number of children: <num>." + ] + }, + "_LEGACY_ERROR_TEMP_3165" : { + "message" : [ + "Cannot merge <classA> with <classB>" + ] + }, + "_LEGACY_ERROR_TEMP_3166" : { + "message" : [ + "latestOffset(Offset, ReadLimit) should be called instead of this method" + ] + }, + "_LEGACY_ERROR_TEMP_3167" : { + "message" : [ + "continuous mode is not supported!" + ] + }, + "_LEGACY_ERROR_TEMP_3168" : { + "message" : [ + "hasTimedOut is true however there's no timeout configured" + ] + }, + "_LEGACY_ERROR_TEMP_3169" : { + "message" : [ + "AcceptsLatestSeenOffset is not supported with DSv1 streaming source: <unsupportedSources>" + ] + }, + "_LEGACY_ERROR_TEMP_3170" : { + "message" : [ + "SortAggregate code-gen does not support grouping keys" + ] + }, + "_LEGACY_ERROR_TEMP_3171" : { + "message" : [ + "Number of nulls not set for Parquet file <filePath>. Set SQLConf <config> to false and execute again." + ] + }, + "_LEGACY_ERROR_TEMP_3172" : { + "message" : [ + "No min/max found for Parquet file <filePath>. Set SQLConf <config> to false and execute again." + ] + }, + "_LEGACY_ERROR_TEMP_3173" : { + "message" : [ + "Cannot specify 'USING index_type' in 'CREATE INDEX'" + ] + }, + "_LEGACY_ERROR_TEMP_3175" : { + "message" : [ + "Index Type <v> is not supported. The supported Index Types are: <supportedIndexTypeList>" + ] + }, + "_LEGACY_ERROR_TEMP_3176" : { + "message" : [ + "applyInPandasWithState is unsupported in batch query. Use applyInPandas instead." + ] + }, + "_LEGACY_ERROR_TEMP_3177" : { + "message" : [ + "<class> does not support function: <funcName>" + ] + }, + "_LEGACY_ERROR_TEMP_3178" : { + "message" : [ + "<class> does not support inverse distribution function: <funcName>" + ] + }, + "_LEGACY_ERROR_TEMP_3179" : { + "message" : [ + "createIndex is not supported" + ] + }, + "_LEGACY_ERROR_TEMP_3180" : { + "message" : [ + "indexExists is not supported" + ] + }, + "_LEGACY_ERROR_TEMP_3181" : { + "message" : [ + "dropIndex is not supported" + ] + }, + "_LEGACY_ERROR_TEMP_3182" : { + "message" : [ + "listIndexes is not supported" + ] + }, + "_LEGACY_ERROR_TEMP_3183" : { + "message" : [ + "TableSample is not supported by this data source" + ] + }, + "_LEGACY_ERROR_TEMP_3184" : { + "message" : [ + "<class> does not support aggregate function: <funcName> with DISTINCT" + ] + }, + "_LEGACY_ERROR_TEMP_3185" : { + "message" : [ + "Schema evolution not supported." + ] + }, + "_LEGACY_ERROR_TEMP_3186" : { + "message" : [ + "Boolean is not supported" + ] + }, + "_LEGACY_ERROR_TEMP_3187" : { + "message" : [ + "only readInts is valid." + ] + }, + "_LEGACY_ERROR_TEMP_3188" : { + "message" : [ + "only skipIntegers is valid" + ] + }, + "_LEGACY_ERROR_TEMP_3189" : { + "message" : [ + "Unsupported encoding: <encoding>" + ] + }, + "_LEGACY_ERROR_TEMP_3190" : { + "message" : [ + "RLE encoding is not supported for values of type: <typeName>" + ] + }, + "_LEGACY_ERROR_TEMP_3191" : { + "message" : [ + "Dictionary encoding does not support String" + ] + }, + "_LEGACY_ERROR_TEMP_3192" : { + "message" : [ + "Datatype not supported <dt>" + ] + }, "_LEGACY_ERROR_USER_RAISED_EXCEPTION" : { "message" : [ "<errorMessage>" diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/columnar/ColumnDictionary.java b/sql/core/src/main/java/org/apache/spark/sql/execution/columnar/ColumnDictionary.java index 29271fc5c0a2..523dde831342 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/columnar/ColumnDictionary.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/columnar/ColumnDictionary.java @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.columnar; +import org.apache.spark.SparkUnsupportedOperationException; import org.apache.spark.sql.execution.vectorized.Dictionary; public final class ColumnDictionary implements Dictionary { @@ -59,6 +60,6 @@ public final class ColumnDictionary implements Dictionary { @Override public byte[] decodeToBinary(int id) { - throw new UnsupportedOperationException("Dictionary encoding does not support String"); + throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3191"); } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcArrayColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcArrayColumnVector.java index b0c818f5a4df..bfed046c9d39 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcArrayColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcArrayColumnVector.java @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.orc; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; +import org.apache.spark.SparkUnsupportedOperationException; import org.apache.spark.sql.types.ArrayType; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Decimal; @@ -52,61 +53,61 @@ public class OrcArrayColumnVector extends OrcColumnVector { @Override public boolean getBoolean(int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public byte getByte(int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public short getShort(int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public int getInt(int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public long getLong(int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public float getFloat(int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public double getDouble(int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public Decimal getDecimal(int rowId, int precision, int scale) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public UTF8String getUTF8String(int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public byte[] getBinary(int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public ColumnarMap getMap(int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public org.apache.spark.sql.vectorized.ColumnVector getChild(int ordinal) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java index f120482f63fa..36e5da64bb75 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcAtomicColumnVector.java @@ -21,6 +21,7 @@ import java.math.BigDecimal; import org.apache.hadoop.hive.ql.exec.vector.*; +import org.apache.spark.SparkUnsupportedOperationException; import org.apache.spark.sql.catalyst.util.DateTimeUtils; import org.apache.spark.sql.catalyst.util.RebaseDateTime; import org.apache.spark.sql.types.DataType; @@ -71,7 +72,7 @@ public class OrcAtomicColumnVector extends OrcColumnVector { } else if (vector instanceof TimestampColumnVector timestampColumnVector) { timestampData = timestampColumnVector; } else { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } } @@ -146,16 +147,16 @@ public class OrcAtomicColumnVector extends OrcColumnVector { @Override public ColumnarArray getArray(int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public ColumnarMap getMap(int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public org.apache.spark.sql.vectorized.ColumnVector getChild(int ordinal) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcMapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcMapColumnVector.java index 7eedd8b59412..a6d82360364f 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcMapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcMapColumnVector.java @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.orc; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; +import org.apache.spark.SparkUnsupportedOperationException; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Decimal; import org.apache.spark.sql.types.MapType; @@ -55,61 +56,61 @@ public class OrcMapColumnVector extends OrcColumnVector { @Override public boolean getBoolean(int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public byte getByte(int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public short getShort(int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public int getInt(int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public long getLong(int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public float getFloat(int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public double getDouble(int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public Decimal getDecimal(int rowId, int precision, int scale) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public UTF8String getUTF8String(int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public byte[] getBinary(int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public ColumnarArray getArray(int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public org.apache.spark.sql.vectorized.ColumnVector getChild(int ordinal) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcStructColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcStructColumnVector.java index 48e540d22095..d675beb6536e 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcStructColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcStructColumnVector.java @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.orc; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.spark.SparkUnsupportedOperationException; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Decimal; import org.apache.spark.sql.types.StructType; @@ -45,61 +46,61 @@ public class OrcStructColumnVector extends OrcColumnVector { @Override public boolean getBoolean(int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public byte getByte(int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public short getShort(int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public int getInt(int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public long getLong(int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public float getFloat(int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public double getDouble(int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public Decimal getDecimal(int rowId, int precision, int scale) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public UTF8String getUTF8String(int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public byte[] getBinary(int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public ColumnarArray getArray(int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public ColumnarMap getMap(int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java index 3863818b0255..0d8713b58cec 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java @@ -27,6 +27,7 @@ import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotat import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveType; +import org.apache.spark.SparkUnsupportedOperationException; import org.apache.spark.sql.catalyst.util.DateTimeUtils; import org.apache.spark.sql.catalyst.util.RebaseDateTime; import org.apache.spark.sql.execution.datasources.DataSourceUtils; @@ -279,7 +280,7 @@ public class ParquetVectorUpdaterFactory { WritableColumnVector values, WritableColumnVector dictionaryIds, Dictionary dictionary) { - throw new UnsupportedOperationException("Boolean is not supported"); + throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3186"); } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index 6479644968ed..d580023bc877 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet; import java.io.IOException; import java.time.ZoneId; +import java.util.Map; import org.apache.parquet.CorruptDeltaByteArrays; import org.apache.parquet.VersionParser.ParsedVersion; @@ -37,6 +38,7 @@ import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotat import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit; import org.apache.parquet.schema.PrimitiveType; +import org.apache.spark.SparkUnsupportedOperationException; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Decimal; @@ -336,7 +338,8 @@ public class VectorizedColumnReader { @SuppressWarnings("deprecation") Encoding plainDict = Encoding.PLAIN_DICTIONARY; // var to allow warning suppression if (dataEncoding != plainDict && dataEncoding != Encoding.RLE_DICTIONARY) { - throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding); + throw new SparkUnsupportedOperationException( + "_LEGACY_ERROR_TEMP_3189", Map.of("encoding", dataEncoding.toString())); } this.dataColumn = new VectorizedRleValuesReader(); this.isCurrentPageDictionaryEncoded = true; @@ -371,18 +374,21 @@ public class VectorizedColumnReader { if (typeName == BOOLEAN) { yield new VectorizedRleValuesReader(1); } else { - throw new UnsupportedOperationException( - "RLE encoding is not supported for values of type: " + typeName); + throw new SparkUnsupportedOperationException( + "_LEGACY_ERROR_TEMP_3190", Map.of("typeName", typeName.toString())); } } - default -> throw new UnsupportedOperationException("Unsupported encoding: " + encoding); + default -> + throw new SparkUnsupportedOperationException( + "_LEGACY_ERROR_TEMP_3189", Map.of("encoding", encoding.toString())); }; } private int readPageV1(DataPageV1 page) throws IOException { if (page.getDlEncoding() != Encoding.RLE && descriptor.getMaxDefinitionLevel() != 0) { - throw new UnsupportedOperationException("Unsupported encoding: " + page.getDlEncoding()); + throw new SparkUnsupportedOperationException( + "_LEGACY_ERROR_TEMP_3189", Map.of("encoding", page.getDlEncoding().toString())); } int pageValueCount = page.getValueCount(); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java index baefa254466f..ca9d6f3c5db0 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java @@ -24,6 +24,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import org.apache.spark.SparkUnsupportedOperationException; import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns; import scala.Option; import scala.jdk.javaapi.CollectionConverters; @@ -375,7 +376,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa ColumnDescriptor desc = column.descriptor().get(); ColumnDescriptor fd = fileSchema.getColumnDescription(desc.getPath()); if (!fd.equals(desc)) { - throw new UnsupportedOperationException("Schema evolution not supported."); + throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3185"); } } else { for (ParquetColumn childColumn : CollectionConverters.asJava(column.children())) { diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java index fb40a131d2a6..4316e49d5b94 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java @@ -26,6 +26,7 @@ import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.ParquetDecodingException; +import org.apache.spark.SparkUnsupportedOperationException; import org.apache.spark.sql.catalyst.util.RebaseDateTime; import org.apache.spark.sql.execution.datasources.DataSourceUtils; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; @@ -50,7 +51,7 @@ public class VectorizedPlainValuesReader extends ValuesReader implements Vectori @Override public void skip() { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } private void updateCurrentByte() { diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedReaderBase.java index b6715f1e7a07..ab8fd9bdb6ff 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedReaderBase.java @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.parquet; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.io.api.Binary; +import org.apache.spark.SparkUnsupportedOperationException; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; /** @@ -28,129 +29,129 @@ public class VectorizedReaderBase extends ValuesReader implements VectorizedValu @Override public void skip() { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public byte readByte() { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public short readShort() { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public Binary readBinary(int len) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public void readBooleans(int total, WritableColumnVector c, int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public void readBytes(int total, WritableColumnVector c, int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public void readShorts(int total, WritableColumnVector c, int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public void readIntegers(int total, WritableColumnVector c, int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public void readIntegersWithRebase(int total, WritableColumnVector c, int rowId, boolean failIfRebase) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public void readUnsignedIntegers(int total, WritableColumnVector c, int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public void readUnsignedLongs(int total, WritableColumnVector c, int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public void readLongs(int total, WritableColumnVector c, int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public void readLongsWithRebase(int total, WritableColumnVector c, int rowId, boolean failIfRebase, String timeZone) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public void readFloats(int total, WritableColumnVector c, int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public void readDoubles(int total, WritableColumnVector c, int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public void readBinary(int total, WritableColumnVector c, int rowId) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public void skipBooleans(int total) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public void skipBytes(int total) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public void skipShorts(int total) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public void skipIntegers(int total) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public void skipLongs(int total) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public void skipFloats(int total) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public void skipDoubles(int total) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public void skipBinary(int total) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override public void skipFixedLenByteArray(int total, int len) { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java index 584aaa2d118b..0d380997fd5b 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java @@ -29,6 +29,7 @@ import org.apache.parquet.column.values.bitpacking.Packer; import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.api.Binary; +import org.apache.spark.SparkUnsupportedOperationException; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; /** @@ -709,43 +710,43 @@ public final class VectorizedRleValuesReader extends ValuesReader @Override public void readUnsignedIntegers(int total, WritableColumnVector c, int rowId) { - throw new UnsupportedOperationException("only readInts is valid."); + throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3187"); } @Override public void readUnsignedLongs(int total, WritableColumnVector c, int rowId) { - throw new UnsupportedOperationException("only readInts is valid."); + throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3187"); } @Override public void readIntegersWithRebase( int total, WritableColumnVector c, int rowId, boolean failIfRebase) { - throw new UnsupportedOperationException("only readInts is valid."); + throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3187"); } @Override public byte readByte() { - throw new UnsupportedOperationException("only readInts is valid."); + throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3187"); } @Override public short readShort() { - throw new UnsupportedOperationException("only readInts is valid."); + throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3187"); } @Override public void readBytes(int total, WritableColumnVector c, int rowId) { - throw new UnsupportedOperationException("only readInts is valid."); + throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3187"); } @Override public void readShorts(int total, WritableColumnVector c, int rowId) { - throw new UnsupportedOperationException("only readInts is valid."); + throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3187"); } @Override public void readLongs(int total, WritableColumnVector c, int rowId) { - throw new UnsupportedOperationException("only readInts is valid."); + throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3187"); } @Override @@ -755,12 +756,12 @@ public final class VectorizedRleValuesReader extends ValuesReader int rowId, boolean failIfRebase, String timeZone) { - throw new UnsupportedOperationException("only readInts is valid."); + throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3187"); } @Override public void readBinary(int total, WritableColumnVector c, int rowId) { - throw new UnsupportedOperationException("only readInts is valid."); + throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3187"); } @Override @@ -786,17 +787,17 @@ public final class VectorizedRleValuesReader extends ValuesReader @Override public void readFloats(int total, WritableColumnVector c, int rowId) { - throw new UnsupportedOperationException("only readInts is valid."); + throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3187"); } @Override public void readDoubles(int total, WritableColumnVector c, int rowId) { - throw new UnsupportedOperationException("only readInts is valid."); + throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3187"); } @Override public Binary readBinary(int len) { - throw new UnsupportedOperationException("only readInts is valid."); + throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3187"); } @Override @@ -811,37 +812,37 @@ public final class VectorizedRleValuesReader extends ValuesReader @Override public void skipBytes(int total) { - throw new UnsupportedOperationException("only skipIntegers is valid"); + throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3188"); } @Override public void skipShorts(int total) { - throw new UnsupportedOperationException("only skipIntegers is valid"); + throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3188"); } @Override public void skipLongs(int total) { - throw new UnsupportedOperationException("only skipIntegers is valid"); + throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3188"); } @Override public void skipFloats(int total) { - throw new UnsupportedOperationException("only skipIntegers is valid"); + throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3188"); } @Override public void skipDoubles(int total) { - throw new UnsupportedOperationException("only skipIntegers is valid"); + throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3188"); } @Override public void skipBinary(int total) { - throw new UnsupportedOperationException("only skipIntegers is valid"); + throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3188"); } @Override public void skipFixedLenByteArray(int total, int len) { - throw new UnsupportedOperationException("only skipIntegers is valid"); + throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3188"); } /** diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java index 29c106651acf..9ff385c995ff 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java @@ -27,6 +27,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import org.apache.spark.SparkUnsupportedOperationException; import org.apache.spark.memory.MemoryMode; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.InternalRow; @@ -182,7 +183,8 @@ public class ColumnVectorUtils { } else if (t instanceof TimestampNTZType) { dst.appendLong(DateTimeUtils.localDateTimeToMicros((LocalDateTime) o)); } else { - throw new UnsupportedOperationException("Type " + t); + throw new SparkUnsupportedOperationException( + "_LEGACY_ERROR_TEMP_3192", Map.of("dt", t.toString())); } } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java index a6b353a2e849..0464fe815989 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java @@ -18,7 +18,9 @@ package org.apache.spark.sql.execution.vectorized; import java.math.BigDecimal; +import java.util.Map; +import org.apache.spark.SparkUnsupportedOperationException; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; import org.apache.spark.sql.types.*; @@ -96,7 +98,7 @@ public final class MutableColumnarRow extends InternalRow { @Override public boolean anyNull() { - throw new UnsupportedOperationException(); + throw SparkUnsupportedOperationException.apply(); } @Override @@ -196,7 +198,8 @@ public final class MutableColumnarRow extends InternalRow { } else if (dataType instanceof MapType) { return getMap(ordinal); } else { - throw new UnsupportedOperationException("Datatype not supported " + dataType); + throw new SparkUnsupportedOperationException( + "_LEGACY_ERROR_TEMP_3192", Map.of("dt", dataType.toString())); } } @@ -224,7 +227,8 @@ public final class MutableColumnarRow extends InternalRow { } else if (dt instanceof CalendarIntervalType) { setInterval(ordinal, (CalendarInterval) value); } else { - throw new UnsupportedOperationException("Datatype not supported " + dt); + throw new SparkUnsupportedOperationException( + "_LEGACY_ERROR_TEMP_3192", Map.of("dt", dt.toString())); } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala index 69a5fd860740..47f5dbdac488 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala @@ -29,7 +29,7 @@ import scala.reflect.ClassTag import org.apache.commons.io.{FilenameUtils, FileUtils} import org.apache.hadoop.fs.{LocalFileSystem, Path => FSPath} -import org.apache.spark.{JobArtifactSet, JobArtifactState, SparkEnv} +import org.apache.spark.{JobArtifactSet, JobArtifactState, SparkEnv, SparkUnsupportedOperationException} import org.apache.spark.internal.Logging import org.apache.spark.internal.config.{CONNECT_SCALA_UDF_STUB_PREFIXES, EXECUTOR_USER_CLASS_PATH_FIRST} import org.apache.spark.sql.SparkSession @@ -275,8 +275,7 @@ class ArtifactManager(session: SparkSession) extends Logging { // `spark.sql.artifact.copyFromLocalToFs.allowDestLocal` // to `true` when starting spark driver, we should only enable it for testing // purpose. - throw new UnsupportedOperationException( - "Uploading artifact file to local file system destination path is not supported.") + throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3161") } } fs.copyFromLocalFile(false, true, new FSPath(localPath.toString), destFSPath) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index cbb34d6d484f..60079152781d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import java.util.Locale -import org.apache.spark.SparkException +import org.apache.spark.{SparkException, SparkUnsupportedOperationException} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{execution, AnalysisException, Strategy} import org.apache.spark.sql.catalyst.InternalRow @@ -59,7 +59,7 @@ case class PlanLater(plan: LogicalPlan) extends LeafExecNode { override def output: Seq[Attribute] = plan.output protected override def doExecute(): RDD[InternalRow] = { - throw new UnsupportedOperationException() + throw SparkUnsupportedOperationException() } } @@ -863,8 +863,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { ) :: Nil case _: FlatMapGroupsInPandasWithState => // TODO(SPARK-40443): support applyInPandasWithState in batch query - throw new UnsupportedOperationException( - "applyInPandasWithState is unsupported in batch query. Use applyInPandas instead.") + throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3176") case logical.CoGroup( f, key, lObj, rObj, lGroup, rGroup, lAttr, rAttr, lOrder, rOrder, oAttr, left, right) => execution.CoGroupExec( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala index 8ab553369de6..8563bbcd7960 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala @@ -24,6 +24,7 @@ import scala.reflect.ClassTag import com.google.common.io.ByteStreams +import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.serializer.{DeserializationStream, SerializationStream, Serializer, SerializerInstance} import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.execution.metric.SQLMetric @@ -79,12 +80,12 @@ private class UnsafeRowSerializerInstance( override def writeAll[T: ClassTag](iter: Iterator[T]): SerializationStream = { // This method is never called by shuffle code. - throw new UnsupportedOperationException + throw SparkUnsupportedOperationException() } override def writeObject[T: ClassTag](t: T): SerializationStream = { // This method is never called by shuffle code. - throw new UnsupportedOperationException + throw SparkUnsupportedOperationException() } override def flush(): Unit = { @@ -145,7 +146,7 @@ private class UnsafeRowSerializerInstance( override def asIterator: Iterator[Any] = { // This method is never called by shuffle code. - throw new UnsupportedOperationException + throw SparkUnsupportedOperationException() } override def readKey[T: ClassTag](): T = { @@ -166,7 +167,7 @@ private class UnsafeRowSerializerInstance( override def readObject[T: ClassTag](): T = { // This method is never called by shuffle code. - throw new UnsupportedOperationException + throw SparkUnsupportedOperationException() } override def close(): Unit = { @@ -176,9 +177,9 @@ private class UnsafeRowSerializerInstance( } // These methods are never called by shuffle code. - override def serialize[T: ClassTag](t: T): ByteBuffer = throw new UnsupportedOperationException + override def serialize[T: ClassTag](t: T): ByteBuffer = throw SparkUnsupportedOperationException() override def deserialize[T: ClassTag](bytes: ByteBuffer): T = - throw new UnsupportedOperationException + throw SparkUnsupportedOperationException() override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = - throw new UnsupportedOperationException + throw SparkUnsupportedOperationException() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 058df24fc13d..6fbb43a95a07 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -23,7 +23,7 @@ import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} import scala.collection.mutable import scala.util.control.NonFatal -import org.apache.spark.{broadcast, SparkException} +import org.apache.spark.{broadcast, SparkException, SparkUnsupportedOperationException} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -339,7 +339,7 @@ trait CodegenSupport extends SparkPlan { * different inputs(join build side, aggregate buffer, etc.), or other special cases. */ def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { - throw new UnsupportedOperationException + throw SparkUnsupportedOperationException() } /** @@ -356,7 +356,9 @@ trait CodegenSupport extends SparkPlan { } else if (children.length == 1) { children.head.asInstanceOf[CodegenSupport].needCopyResult } else { - throw new UnsupportedOperationException + throw new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_3163", + messageParameters = Map("num" -> children.length.toString)) } } @@ -779,11 +781,11 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int) } override def inputRDDs(): Seq[RDD[InternalRow]] = { - throw new UnsupportedOperationException + throw SparkUnsupportedOperationException() } override def doProduce(ctx: CodegenContext): String = { - throw new UnsupportedOperationException + throw SparkUnsupportedOperationException() } override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala index 37cdea084d8a..5ec56e86d795 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala @@ -21,6 +21,7 @@ import scala.collection.mutable import org.apache.commons.io.FileUtils +import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} @@ -260,7 +261,7 @@ case class OptimizeSkewedJoin(ensureRequirements: EnsureRequirements) // caused by skew join optimization. However, this shouldn't apply to the sub-plan under skew join, // as it's guaranteed to satisfy distribution requirement. case class SkewJoinChildWrapper(plan: SparkPlan) extends LeafExecNode { - override protected def doExecute(): RDD[InternalRow] = throw new UnsupportedOperationException() + override protected def doExecute(): RDD[InternalRow] = throw SparkUnsupportedOperationException() override def output: Seq[Attribute] = plan.output override def outputPartitioning: Partitioning = plan.outputPartitioning override def outputOrdering: Seq[SortOrder] = plan.outputOrdering diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala index 6042ff7b2caf..c4ff2454ae67 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.aggregate +import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -98,11 +99,11 @@ case class SortAggregateExec( protected override def needHashTable: Boolean = false protected override def doProduceWithKeys(ctx: CodegenContext): String = { - throw new UnsupportedOperationException("SortAggregate code-gen does not support grouping keys") + throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3170") } protected override def doConsumeWithKeys(ctx: CodegenContext, input: Seq[ExprCode]): String = { - throw new UnsupportedOperationException("SortAggregate code-gen does not support grouping keys") + throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3170") } override def simpleString(maxFields: Int): String = toString(verbose = false, maxFields) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala index 33918bcee738..5eadc7d47c92 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.columnar +import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -51,16 +52,16 @@ class MutableUnsafeRow(val writer: UnsafeRowWriter) extends BaseGenericInternalR // the writer will be used directly to avoid creating wrapper objects override def setDecimal(i: Int, v: Decimal, precision: Int): Unit = - throw new UnsupportedOperationException + throw SparkUnsupportedOperationException() override def setInterval(i: Int, value: CalendarInterval): Unit = - throw new UnsupportedOperationException + throw SparkUnsupportedOperationException() - override def update(i: Int, v: Any): Unit = throw new UnsupportedOperationException + override def update(i: Int, v: Any): Unit = throw SparkUnsupportedOperationException() // all other methods inherited from GenericMutableRow are not need - override protected def genericGet(ordinal: Int): Any = throw new UnsupportedOperationException - override def numFields: Int = throw new UnsupportedOperationException + override protected def genericGet(ordinal: Int): Any = throw SparkUnsupportedOperationException() + override def numFields: Int = throw SparkUnsupportedOperationException() } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 51daea76abc5..9fb10f42164f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -27,7 +27,7 @@ import scala.jdk.CollectionConverters._ import scala.util.Try import scala.util.control.NonFatal -import org.apache.spark.{SparkThrowable, TaskContext} +import org.apache.spark.{SparkThrowable, SparkUnsupportedOperationException, TaskContext} import org.apache.spark.executor.InputMetrics import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, Row} @@ -1132,8 +1132,11 @@ object JdbcUtils extends Logging with SQLConfHelper { if (containsIndexTypeIgnoreCase(supportedIndexTypeList, v)) { indexType = s"USING $v" } else { - throw new UnsupportedOperationException(s"Index Type $v is not supported." + - s" The supported Index Types are: ${supportedIndexTypeList.mkString(" AND ")}") + throw new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_3175", + messageParameters = Map( + "v" -> v, + "supportedIndexTypeList" -> supportedIndexTypeList.mkString(" AND "))) } } else { indexPropertyList.append(s"$k = $v") @@ -1145,8 +1148,7 @@ object JdbcUtils extends Logging with SQLConfHelper { def containsIndexTypeIgnoreCase(supportedIndexTypeList: Array[String], value: String): Boolean = { if (supportedIndexTypeList.isEmpty) { - throw new UnsupportedOperationException( - "Cannot specify 'USING index_type' in 'CREATE INDEX'") + throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3173") } for (indexType <- supportedIndexTypeList) { if (value.equalsIgnoreCase(indexType)) return true diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala index bd6b5bfeb4da..5020bf7333de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala @@ -32,7 +32,7 @@ import org.apache.parquet.io.api.Binary import org.apache.parquet.schema.{PrimitiveType, Types} import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName -import org.apache.spark.SparkException +import org.apache.spark.{SparkException, SparkUnsupportedOperationException} import org.apache.spark.internal.Logging import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession @@ -394,8 +394,11 @@ object ParquetUtils extends Logging { isMax: Boolean): Any = { val statistics = columnChunkMetaData.get(i).getStatistics if (!statistics.hasNonNullValue) { - throw new UnsupportedOperationException(s"No min/max found for Parquet file $filePath. " + - s"Set SQLConf ${PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key} to false and execute again") + throw new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_3172", + messageParameters = Map( + "filePath" -> filePath, + "config" -> PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key)) } else { if (isMax) statistics.genericGetMax else statistics.genericGetMin } @@ -407,9 +410,11 @@ object ParquetUtils extends Logging { i: Int): Long = { val statistics = columnChunkMetaData.get(i).getStatistics if (!statistics.isNumNullsSet) { - throw new UnsupportedOperationException(s"Number of nulls not set for Parquet file" + - s" $filePath. Set SQLConf ${PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key} to false and execute" + - s" again") + throw new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_3171", + messageParameters = Map( + "filePath" -> filePath, + "config" -> PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key)) } statistics.getNumNulls; } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index 16345bb35db2..85c198290542 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -22,7 +22,7 @@ import java.io._ import com.esotericsoftware.kryo.{Kryo, KryoSerializable} import com.esotericsoftware.kryo.io.{Input, Output} -import org.apache.spark.{SparkConf, SparkEnv, SparkException} +import org.apache.spark.{SparkConf, SparkEnv, SparkException, SparkUnsupportedOperationException} import org.apache.spark.internal.config.{BUFFER_PAGESIZE, MEMORY_OFFHEAP_ENABLED} import org.apache.spark.memory._ import org.apache.spark.sql.catalyst.InternalRow @@ -52,7 +52,7 @@ private[execution] sealed trait HashedRelation extends KnownSizeEstimation { * Returns null if there is no matched rows. */ def get(key: Long): Iterator[InternalRow] = { - throw new UnsupportedOperationException + throw SparkUnsupportedOperationException() } /** @@ -64,7 +64,7 @@ private[execution] sealed trait HashedRelation extends KnownSizeEstimation { * Returns the matched single row with key that have only one column of LongType. */ def getValue(key: Long): InternalRow = { - throw new UnsupportedOperationException + throw SparkUnsupportedOperationException() } /** @@ -73,7 +73,7 @@ private[execution] sealed trait HashedRelation extends KnownSizeEstimation { * Returns null if there is no matched rows. */ def getWithKeyIndex(key: InternalRow): Iterator[ValueRowWithKeyIndex] = { - throw new UnsupportedOperationException + throw SparkUnsupportedOperationException() } /** @@ -83,21 +83,21 @@ private[execution] sealed trait HashedRelation extends KnownSizeEstimation { * Returns null if there is no matched rows. */ def getValueWithKeyIndex(key: InternalRow): ValueRowWithKeyIndex = { - throw new UnsupportedOperationException + throw SparkUnsupportedOperationException() } /** * Returns an iterator for keys index and rows of InternalRow type. */ def valuesWithKeyIndex(): Iterator[ValueRowWithKeyIndex] = { - throw new UnsupportedOperationException + throw SparkUnsupportedOperationException() } /** * Returns the maximum number of allowed keys index. */ def maxNumKeysIndex: Int = { - throw new UnsupportedOperationException + throw SparkUnsupportedOperationException() } /** @@ -1043,19 +1043,19 @@ class LongHashedRelation( override def keys(): Iterator[InternalRow] = map.keys() override def getWithKeyIndex(key: InternalRow): Iterator[ValueRowWithKeyIndex] = { - throw new UnsupportedOperationException + throw SparkUnsupportedOperationException() } override def getValueWithKeyIndex(key: InternalRow): ValueRowWithKeyIndex = { - throw new UnsupportedOperationException + throw SparkUnsupportedOperationException() } override def valuesWithKeyIndex(): Iterator[ValueRowWithKeyIndex] = { - throw new UnsupportedOperationException + throw SparkUnsupportedOperationException() } override def maxNumKeysIndex: Int = { - throw new UnsupportedOperationException + throw SparkUnsupportedOperationException() } } @@ -1126,11 +1126,11 @@ case object EmptyHashedRelation extends HashedRelation { */ case object HashedRelationWithAllNullKeys extends HashedRelation { override def get(key: InternalRow): Iterator[InternalRow] = { - throw new UnsupportedOperationException + throw SparkUnsupportedOperationException() } override def getValue(key: InternalRow): InternalRow = { - throw new UnsupportedOperationException + throw SparkUnsupportedOperationException() } override def asReadOnlyCopy(): HashedRelationWithAllNullKeys.type = this @@ -1138,7 +1138,7 @@ case object HashedRelationWithAllNullKeys extends HashedRelation { override def keyIsUnique: Boolean = true override def keys(): Iterator[InternalRow] = { - throw new UnsupportedOperationException + throw SparkUnsupportedOperationException() } override def close(): Unit = {} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasWithStateExec.scala index 850ee016e363..eef0b3e3e846 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasWithStateExec.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.sql.execution.python -import org.apache.spark.{JobArtifactSet, SparkException, TaskContext} +import org.apache.spark.{JobArtifactSet, SparkException, SparkUnsupportedOperationException, TaskContext} import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.InternalRow @@ -138,7 +138,7 @@ case class FlatMapGroupsInPandasWithStateExec( override def processNewDataWithInitialState( childDataIter: Iterator[InternalRow], initStateIter: Iterator[InternalRow]): Iterator[InternalRow] = { - throw new UnsupportedOperationException("Should not reach here!") + throw SparkUnsupportedOperationException() } override def processTimedOutState(): Iterator[InternalRow] = { @@ -232,7 +232,7 @@ case class FlatMapGroupsInPandasWithStateExec( stateData: StateData, valueRowIter: Iterator[InternalRow], hasTimedOut: Boolean): Iterator[InternalRow] = { - throw new UnsupportedOperationException("Should not reach here!") + throw SparkUnsupportedOperationException() } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AcceptsLatestSeenOffsetHandler.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AcceptsLatestSeenOffsetHandler.scala index 69795cc82c47..bf4333d8e9dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AcceptsLatestSeenOffsetHandler.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AcceptsLatestSeenOffsetHandler.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.streaming +import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.sql.connector.read.streaming.{AcceptsLatestSeenOffset, SparkDataStream} /** @@ -47,9 +48,9 @@ object AcceptsLatestSeenOffsetHandler { .filter(_.isInstanceOf[Source]) if (unsupportedSources.nonEmpty) { - throw new UnsupportedOperationException( - "AcceptsLatestSeenOffset is not supported with DSv1 streaming source: " + - unsupportedSources) + throw new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_3169", + messageParameters = Map("unsupportedSources" -> unsupportedSources.toString())) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowMicroBatchStreamWrapper.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowMicroBatchStreamWrapper.scala index f60468c85e6e..3df358ae9bf8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowMicroBatchStreamWrapper.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowMicroBatchStreamWrapper.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.streaming +import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory} import org.apache.spark.sql.connector.read.streaming import org.apache.spark.sql.connector.read.streaming.MicroBatchStream @@ -29,8 +30,8 @@ import org.apache.spark.sql.connector.read.streaming.MicroBatchStream class AvailableNowMicroBatchStreamWrapper(delegate: MicroBatchStream) extends AvailableNowDataStreamWrapper(delegate) with MicroBatchStream { - override def latestOffset(): streaming.Offset = throw new UnsupportedOperationException( - "latestOffset(Offset, ReadLimit) should be called instead of this method") + override def latestOffset(): streaming.Offset = + throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3166") override def planInputPartitions(start: streaming.Offset, end: streaming.Offset): Array[InputPartition] = delegate.planInputPartitions(start, end) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowSourceWrapper.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowSourceWrapper.scala index e971b1737ccc..af5713144ebb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowSourceWrapper.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowSourceWrapper.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.streaming +import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.sql.DataFrame import org.apache.spark.sql.types.StructType @@ -30,8 +31,8 @@ class AvailableNowSourceWrapper(delegate: Source) override def schema: StructType = delegate.schema - override def getOffset: Option[Offset] = throw new UnsupportedOperationException( - "latestOffset(Offset, ReadLimit) should be called instead of this method") + override def getOffset: Option[Offset] = + throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3166") override def getBatch(start: Option[Offset], end: Offset): DataFrame = delegate.getBatch(start, end) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala index c9ade7b568e8..cb283699b4e3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit import org.json4s._ import org.json4s.jackson.JsonMethods._ +import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.api.java.Optional import org.apache.spark.sql.catalyst.plans.logical.{EventTimeTimeout, NoTimeout, ProcessingTimeTimeout} import org.apache.spark.sql.catalyst.util.IntervalUtils @@ -217,8 +218,7 @@ private[sql] object GroupStateImpl { throw new IllegalArgumentException("eventTimeWatermarkMs must be 0 or positive if present") } if (hasTimedOut && timeoutConf == NoTimeout) { - throw new UnsupportedOperationException( - "hasTimedOut is true however there's no timeout configured") + throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3168") } new GroupStateImpl[S]( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala index 17cc1860fbdc..c518f6fe7bae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming.sources import java.util +import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability} import org.apache.spark.sql.connector.read.{Scan, ScanBuilder} @@ -109,7 +110,7 @@ class RatePerMicroBatchTable( advanceMillisPerBatch, options) override def toContinuousStream(checkpointLocation: String): ContinuousStream = { - throw new UnsupportedOperationException("continuous mode is not supported!") + throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3167") } override def columnarSupportMode(): Scan.ColumnarSupportMode = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchStream.scala index 8dca7d40704a..d51f87cb1a57 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchStream.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchStream.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.streaming.sources import org.json4s.{Formats, NoTypeHints} import org.json4s.jackson.Serialization +import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.DateTimeUtils @@ -38,8 +39,7 @@ class RatePerMicroBatchStream( override def initialOffset(): Offset = RatePerMicroBatchStreamOffset(0L, startTimestamp) override def latestOffset(): Offset = { - throw new UnsupportedOperationException( - "latestOffset(Offset, ReadLimit) should be called instead of this method") + throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3166") } override def getDefaultReadLimit: ReadLimit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreMap.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreMap.scala index 59b01b5b454d..32ff87f754d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreMap.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreMap.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.streaming.state import scala.collection.mutable import scala.jdk.CollectionConverters._ +import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.sql.catalyst.expressions.{BoundReference, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.types.{StructField, StructType} @@ -77,7 +78,7 @@ class NoPrefixHDFSBackedStateStoreMap extends HDFSBackedStateStoreMap { } override def prefixScan(prefixKey: UnsafeRow): Iterator[UnsafeRowPair] = { - throw new UnsupportedOperationException("Prefix scan is not supported!") + throw SparkUnsupportedOperationException() } override def clear(): Unit = map.clear() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala index 4aa7444c407e..44d18b44e4ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.window import java.util +import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences @@ -206,9 +207,9 @@ abstract class OffsetWindowFunctionFrameBase( inputIndex = offset } - override def currentLowerBound(): Int = throw new UnsupportedOperationException() + override def currentLowerBound(): Int = throw SparkUnsupportedOperationException() - override def currentUpperBound(): Int = throw new UnsupportedOperationException() + override def currentUpperBound(): Int = throw SparkUnsupportedOperationException() } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala index f745e466ed9e..62c31b1c4c5d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala @@ -22,6 +22,7 @@ import java.util.Locale import scala.util.control.NonFatal +import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException import org.apache.spark.sql.connector.catalog.Identifier @@ -49,8 +50,11 @@ private object DB2Dialect extends JdbcDialect { override def visitAggregateFunction( funcName: String, isDistinct: Boolean, inputs: Array[String]): String = if (isDistinct && distinctUnsupportedAggregateFunctions.contains(funcName)) { - throw new UnsupportedOperationException(s"${this.getClass.getSimpleName} does not " + - s"support aggregate function: $funcName with DISTINCT"); + throw new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_3184", + messageParameters = Map( + "class" -> this.getClass.getSimpleName, + "funcName" -> funcName)) } else { super.visitAggregateFunction(funcName, isDistinct, inputs) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala index cd151f790adf..74eca7e48577 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala @@ -27,6 +27,7 @@ import scala.util.control.NonFatal import org.apache.commons.lang3.StringUtils +import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.connector.catalog.Identifier @@ -268,8 +269,11 @@ private[sql] object H2Dialect extends JdbcDialect { override def visitAggregateFunction( funcName: String, isDistinct: Boolean, inputs: Array[String]): String = if (isDistinct && distinctUnsupportedAggregateFunctions.contains(funcName)) { - throw new UnsupportedOperationException(s"${this.getClass.getSimpleName} does not " + - s"support aggregate function: $funcName with DISTINCT") + throw new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_3184", + messageParameters = Map( + "class" -> this.getClass.getSimpleName, + "funcName" -> funcName)) } else { super.visitAggregateFunction(funcName, isDistinct, inputs) } @@ -296,8 +300,11 @@ private[sql] object H2Dialect extends JdbcDialect { case _ => super.visitSQLFunction(funcName, inputs) } } else { - throw new UnsupportedOperationException( - s"${this.getClass.getSimpleName} does not support function: $funcName"); + throw new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_3177", + messageParameters = Map( + "class" -> this.getClass.getSimpleName, + "funcName" -> funcName)) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index bee870fcf7b7..4052f5544f80 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -26,6 +26,7 @@ import scala.util.control.NonFatal import org.apache.commons.lang3.StringUtils +import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.annotation.{DeveloperApi, Since} import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException @@ -325,8 +326,11 @@ abstract class JdbcDialect extends Serializable with Logging { } else { // The framework will catch the error and give up the push-down. // Please see `JdbcDialect.compileExpression(expr: Expression)` for more details. - throw new UnsupportedOperationException( - s"${this.getClass.getSimpleName} does not support function: $funcName") + throw new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_3177", + messageParameters = Map( + "class" -> this.getClass.getSimpleName, + "funcName" -> funcName)) } } @@ -335,8 +339,11 @@ abstract class JdbcDialect extends Serializable with Logging { if (isSupportedFunction(funcName)) { super.visitAggregateFunction(dialectFunctionName(funcName), isDistinct, inputs) } else { - throw new UnsupportedOperationException( - s"${this.getClass.getSimpleName} does not support aggregate function: $funcName") + throw new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_3177", + messageParameters = Map( + "class" -> this.getClass.getSimpleName, + "funcName" -> funcName)) } } @@ -349,9 +356,11 @@ abstract class JdbcDialect extends Serializable with Logging { super.visitInverseDistributionFunction( dialectFunctionName(funcName), isDistinct, inputs, orderingWithinGroups) } else { - throw new UnsupportedOperationException( - s"${this.getClass.getSimpleName} does not support " + - s"inverse distribution function: $funcName") + throw new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_3178", + messageParameters = Map( + "class" -> this.getClass.getSimpleName, + "funcName" -> funcName)) } } @@ -361,8 +370,11 @@ abstract class JdbcDialect extends Serializable with Logging { if (isSupportedFunction("OVERLAY")) { super.visitOverlay(inputs) } else { - throw new UnsupportedOperationException( - s"${this.getClass.getSimpleName} does not support function: OVERLAY") + throw new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_3177", + messageParameters = Map( + "class" -> this.getClass.getSimpleName, + "funcName" -> "OVERLAY")) } } @@ -370,8 +382,11 @@ abstract class JdbcDialect extends Serializable with Logging { if (isSupportedFunction("TRIM")) { super.visitTrim(direction, inputs) } else { - throw new UnsupportedOperationException( - s"${this.getClass.getSimpleName} does not support function: TRIM") + throw new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_3177", + messageParameters = Map( + "class" -> this.getClass.getSimpleName, + "funcName" -> "TRIM")) } } } @@ -602,7 +617,7 @@ abstract class JdbcDialect extends Serializable with Logging { columns: Array[NamedReference], columnsProperties: util.Map[NamedReference, util.Map[String, String]], properties: util.Map[String, String]): String = { - throw new UnsupportedOperationException("createIndex is not supported") + throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3179") } /** @@ -619,7 +634,7 @@ abstract class JdbcDialect extends Serializable with Logging { indexName: String, tableIdent: Identifier, options: JDBCOptions): Boolean = { - throw new UnsupportedOperationException("indexExists is not supported") + throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3180") } /** @@ -630,7 +645,7 @@ abstract class JdbcDialect extends Serializable with Logging { * @return the SQL statement to use for dropping the index. */ def dropIndex(indexName: String, tableIdent: Identifier): String = { - throw new UnsupportedOperationException("dropIndex is not supported") + throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3181") } /** @@ -640,7 +655,7 @@ abstract class JdbcDialect extends Serializable with Logging { conn: Connection, tableIdent: Identifier, options: JDBCOptions): Array[TableIndex] = { - throw new UnsupportedOperationException("listIndexes is not supported") + throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3182") } /** @@ -716,7 +731,7 @@ abstract class JdbcDialect extends Serializable with Logging { def supportsTableSample: Boolean = false def getTableSample(sample: TableSampleInfo): String = - throw new UnsupportedOperationException("TableSample is not supported by this data source") + throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3183") /** * Return the DB-specific quoted and fully qualified table name diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index cbed1d1e6384..5a434a935e96 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -24,6 +24,7 @@ import java.util.Locale import scala.collection.mutable.ArrayBuilder import scala.util.control.NonFatal +import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException} @@ -68,8 +69,11 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { override def visitAggregateFunction( funcName: String, isDistinct: Boolean, inputs: Array[String]): String = if (isDistinct && distinctUnsupportedAggregateFunctions.contains(funcName)) { - throw new UnsupportedOperationException(s"${this.getClass.getSimpleName} does not " + - s"support aggregate function: $funcName with DISTINCT"); + throw new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_3184", + messageParameters = Map( + "class" -> this.getClass.getSimpleName, + "funcName" -> funcName)) } else { super.visitAggregateFunction(funcName, isDistinct, inputs) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala index b6c98eedc16d..6d4dc060b636 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala @@ -22,6 +22,7 @@ import java.util.{Locale, TimeZone} import scala.util.control.NonFatal +import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.connector.expressions.Expression import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions @@ -55,8 +56,11 @@ private case object OracleDialect extends JdbcDialect { override def visitAggregateFunction( funcName: String, isDistinct: Boolean, inputs: Array[String]): String = if (isDistinct && distinctUnsupportedAggregateFunctions.contains(funcName)) { - throw new UnsupportedOperationException(s"${this.getClass.getSimpleName} does not " + - s"support aggregate function: $funcName with DISTINCT"); + throw new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_3184", + messageParameters = Map( + "class" -> this.getClass.getSimpleName, + "funcName" -> funcName)) } else { super.visitAggregateFunction(funcName, isDistinct, inputs) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/util/MapperRowCounter.scala b/sql/core/src/main/scala/org/apache/spark/sql/util/MapperRowCounter.scala index 7e1dfacca4a2..eb6a5a2a6f63 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/util/MapperRowCounter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/util/MapperRowCounter.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.util import java.{lang => jl} +import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.util.AccumulatorV2 /** @@ -77,8 +78,11 @@ class MapperRowCounter extends AccumulatorV2[jl.Long, java.util.List[(jl.Integer case o: MapperRowCounter => this.synchronized(getOrCreate.addAll(o.value)) case _ => - throw new UnsupportedOperationException( - s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") + throw new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_3165", + messageParameters = Map( + "classA" -> this.getClass.getName, + "classB" -> other.getClass.getName)) } override def value: java.util.List[(jl.Integer, jl.Long)] = this.synchronized(getOrCreate) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index f0f48026a4a0..78776b7efbca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -29,7 +29,7 @@ import org.scalatest.Assertions._ import org.scalatest.exceptions.TestFailedException import org.scalatest.prop.TableDrivenPropertyChecks._ -import org.apache.spark.{SparkConf, SparkException, TaskContext} +import org.apache.spark.{SparkConf, SparkException, SparkUnsupportedOperationException, TaskContext} import org.apache.spark.TestUtils.withListener import org.apache.spark.internal.config.MAX_RESULT_SIZE import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} @@ -2797,11 +2797,11 @@ case class DataSeqOptSeq(a: Seq[Option[Seq[Int]]]) */ case class NonSerializableCaseClass(value: String) extends Externalizable { override def readExternal(in: ObjectInput): Unit = { - throw new UnsupportedOperationException + throw SparkUnsupportedOperationException() } override def writeExternal(out: ObjectOutput): Unit = { - throw new UnsupportedOperationException + throw SparkUnsupportedOperationException() } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 03b8ca32f561..6692f469cf42 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -26,7 +26,7 @@ import scala.collection.mutable import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{LocalFileSystem, Path} -import org.apache.spark.{SparkException, SparkFileNotFoundException, SparkRuntimeException} +import org.apache.spark.{SparkException, SparkFileNotFoundException, SparkRuntimeException, SparkUnsupportedOperationException} import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql.TestingUDT.{IntervalUDT, NullData, NullUDT} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterThan, Literal} @@ -1257,9 +1257,9 @@ object TestingUDT { override def sqlType: DataType = CalendarIntervalType override def serialize(obj: IntervalData): Any = - throw new UnsupportedOperationException("Not implemented") + throw SparkUnsupportedOperationException() override def deserialize(datum: Any): IntervalData = - throw new UnsupportedOperationException("Not implemented") + throw SparkUnsupportedOperationException() override def userClass: Class[IntervalData] = classOf[IntervalData] } @@ -1270,9 +1270,9 @@ object TestingUDT { override def sqlType: DataType = NullType override def serialize(obj: NullData): Any = - throw new UnsupportedOperationException("Not implemented") + throw SparkUnsupportedOperationException() override def deserialize(datum: Any): NullData = - throw new UnsupportedOperationException("Not implemented") + throw SparkUnsupportedOperationException() override def userClass: Class[NullData] = classOf[NullData] } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala index f896997b57c3..141581e75884 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala @@ -24,6 +24,7 @@ import test.org.apache.spark.sql.connector.catalog.functions.JavaLongAdd._ import test.org.apache.spark.sql.connector.catalog.functions.JavaRandomAdd._ import test.org.apache.spark.sql.connector.catalog.functions.JavaStrLen._ +import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.sql.{AnalysisException, DataFrame, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode.{FALLBACK, NO_CODEGEN} @@ -807,7 +808,7 @@ class DataSourceV2FunctionSuite extends DatasourceV2SQLBase { override def description(): String = name() override def bind(inputType: StructType): BoundFunction = { - throw new UnsupportedOperationException(s"Not implemented") + throw SparkUnsupportedOperationException() } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala index fbcbf287b455..a7fb2c054e80 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala @@ -1302,7 +1302,7 @@ object ColumnarReaderFactory extends PartitionReaderFactory { override def supportColumnarReads(partition: InputPartition): Boolean = true override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { - throw new UnsupportedOperationException + throw SparkUnsupportedOperationException() } override def createColumnarReader(partition: InputPartition): PartitionReader[ColumnarBatch] = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/TableCapabilityCheckSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/TableCapabilityCheckSuite.scala index d7a8225a7d08..6a3d6054301e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/TableCapabilityCheckSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/TableCapabilityCheckSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.connector +import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext} import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, NamedRelation} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal} @@ -239,10 +240,10 @@ private class TestStreamSourceProvider extends StreamSourceProvider { new Source { override def schema: StructType = TableCapabilityCheckSuite.schema override def getOffset: Option[Offset] = { - throw new UnsupportedOperationException + throw SparkUnsupportedOperationException() } override def getBatch(start: Option[Offset], end: Offset): DataFrame = { - throw new UnsupportedOperationException + throw SparkUnsupportedOperationException() } override def stop(): Unit = {} } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1ReadFallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1ReadFallbackSuite.scala index c5be222645b1..b876240ddc37 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1ReadFallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1ReadFallbackSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.connector +import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, QueryTest, Row, SparkSession, SQLContext} import org.apache.spark.sql.connector.catalog.{BasicInMemoryTableCatalog, Identifier, SupportsRead, Table, TableCapability} @@ -105,7 +106,7 @@ class V1ReadFallbackCatalog extends BasicInMemoryTableCatalog { properties: java.util.Map[String, String]): Table = { // To simplify the test implementation, only support fixed schema. if (schema != V1ReadFallbackCatalog.schema || partitions.nonEmpty) { - throw new UnsupportedOperationException + throw SparkUnsupportedOperationException() } val table = new TableWithV1ReadFallback(ident.toString) tables.put(ident, table) @@ -188,7 +189,7 @@ class V1TableScan( } else if (requiredSchema.map(_.name) == Seq("j")) { data.map(row => Row(row.getInt(1))) } else { - throw new UnsupportedOperationException + throw SparkUnsupportedOperationException() } SparkSession.active.sparkContext.makeRDD(result) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ColumnarRulesSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ColumnarRulesSuite.scala index 75223a779d22..e412cc7f776d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ColumnarRulesSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ColumnarRulesSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution +import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute @@ -53,12 +54,12 @@ class ColumnarRulesSuite extends PlanTest with SharedSparkSession { } case class LeafOp(override val supportsColumnar: Boolean) extends LeafExecNode { - override protected def doExecute(): RDD[InternalRow] = throw new UnsupportedOperationException() + override protected def doExecute(): RDD[InternalRow] = throw SparkUnsupportedOperationException() override def output: Seq[Attribute] = Seq.empty } case class UnaryOp(child: SparkPlan, override val supportsColumnar: Boolean) extends UnaryExecNode { - override protected def doExecute(): RDD[InternalRow] = throw new UnsupportedOperationException() + override protected def doExecute(): RDD[InternalRow] = throw SparkUnsupportedOperationException() override def output: Seq[Attribute] = child.output override protected def withNewChildInternal(newChild: SparkPlan): UnaryOp = copy(child = newChild) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index be532ed9097c..15de4c5cc5b2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution +import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.rdd.RDD import org.apache.spark.sql.{execution, DataFrame, Row} import org.apache.spark.sql.catalyst.InternalRow @@ -1397,7 +1398,7 @@ private case class DummySparkPlan( override val requiredChildDistribution: Seq[Distribution] = Nil, override val requiredChildOrdering: Seq[Seq[SortOrder]] = Nil ) extends SparkPlan { - override protected def doExecute(): RDD[InternalRow] = throw new UnsupportedOperationException + override protected def doExecute(): RDD[InternalRow] = throw SparkUnsupportedOperationException() override def output: Seq[Attribute] = Seq.empty override protected def withNewChildrenInternal(newChildren: IndexedSeq[SparkPlan]): SparkPlan = copy(children = newChildren) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala index 058719f265d0..966f4e747122 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution -import org.apache.spark.{SparkEnv, SparkException} +import org.apache.spark.{SparkEnv, SparkException, SparkUnsupportedOperationException} import org.apache.spark.rdd.RDD import org.apache.spark.sql.QueryTest import org.apache.spark.sql.catalyst.InternalRow @@ -152,7 +152,7 @@ case class ColumnarOp(child: SparkPlan) extends UnaryExecNode { override val supportsColumnar: Boolean = true override protected def doExecuteColumnar(): RDD[ColumnarBatch] = RowToColumnarExec(child).executeColumnar() - override protected def doExecute(): RDD[InternalRow] = throw new UnsupportedOperationException() + override protected def doExecute(): RDD[InternalRow] = throw SparkUnsupportedOperationException() override def output: Seq[Attribute] = child.output override protected def withNewChildInternal(newChild: SparkPlan): ColumnarOp = copy(child = newChild) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarDataTypeUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarDataTypeUtils.scala index 050c4ede7cf3..4c1429e90b95 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarDataTypeUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarDataTypeUtils.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.execution.columnar +import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.types._ @@ -37,6 +38,8 @@ object ColumnarDataTypeUtils { case PhysicalStructType(fields) => StructType(fields) case PhysicalMapType(keyType, valueType, valueContainsNull) => MapType(keyType, valueType, valueContainsNull) - case _ => throw new UnsupportedOperationException() + case unsupportedType => throw new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_3162", + messageParameters = Map("type" -> unsupportedType.toString)) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org