This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 88d4681dd6ab [SPARK-46950][CORE][SQL] Align `not available codec` error-class 88d4681dd6ab is described below commit 88d4681dd6ab37d65d704b0119ba2a54801fffde Author: panbingkun <panbing...@baidu.com> AuthorDate: Sat Feb 3 10:21:58 2024 +0300 [SPARK-46950][CORE][SQL] Align `not available codec` error-class ### What changes were proposed in this pull request? The pr aims to align `not available codec` error-class, includes: - In `core`, convert `CODEC_NOT_AVAILABLE` to `CODEC_NOT_AVAILABLE.WITH_CONF_SUGGESTION`. - In `sql`, use `CODEC_NOT_AVAILABLE.WITH_AVAILABLE_CODECS_SUGGESTION` to cover `IllegalArgumentException`. ### Why are the changes needed? When I was solving a case where `the compression option` is `null`, as follows: <img width="1423" alt="image" src="https://github.com/apache/spark/assets/15246973/0e2abdad-0c1c-4ade-9f48-014bc4372fc6"> I found that we could first complete the extraction logic for `the codec error`. At the same time, I found that there was already an `CODEC_NOT_AVAILABLE` error in the `error-classes.json`, but it had some differences from the error prompt in `SQL` for compress. Therefore, I proposed using `CODEC_NOT_AVAILABLE` as `a parent class error` and adding `two subclass errors`: `WITH_AVAILABLE_CODECS_SUGGESTION` and `WITH_CONF_SUGGESTION`. ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? - Update existed UT. - Pass GA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44992 from panbingkun/codec_improve. Lead-authored-by: panbingkun <panbing...@baidu.com> Co-authored-by: Maxim Gekk <max.g...@gmail.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../src/main/resources/error/error-classes.json | 14 +++++++- .../org/apache/spark/errors/SparkCoreErrors.scala | 13 ++++++- .../org/apache/spark/io/CompressionCodec.scala | 9 ++--- .../apache/spark/io/CompressionCodecSuite.scala | 2 +- ...r-conditions-codec-not-available-error-class.md | 41 ++++++++++++++++++++++ docs/sql-error-conditions.md | 6 ++-- .../sql/catalyst/util/CompressionCodecs.scala | 7 ++-- .../spark/sql/errors/QueryExecutionErrors.scala | 8 +++++ .../sql/execution/datasources/orc/OrcOptions.scala | 4 +-- .../datasources/parquet/ParquetOptions.scala | 5 +-- .../ParquetCompressionCodecPrecedenceSuite.scala | 14 +++++--- .../sql/execution/datasources/text/TextSuite.scala | 18 ++++++---- 12 files changed, 112 insertions(+), 29 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 6d88f5ee511c..8399311cbfc4 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -441,8 +441,20 @@ }, "CODEC_NOT_AVAILABLE" : { "message" : [ - "The codec <codecName> is not available. Consider to set the config <configKey> to <configVal>." + "The codec <codecName> is not available." ], + "subClass" : { + "WITH_AVAILABLE_CODECS_SUGGESTION" : { + "message" : [ + "Available codecs are <availableCodecs>." + ] + }, + "WITH_CONF_SUGGESTION" : { + "message" : [ + "Consider to set the config <configKey> to <configVal>." + ] + } + }, "sqlState" : "56038" }, "CODEC_SHORT_NAME_NOT_FOUND" : { diff --git a/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala b/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala index 641310f2a0c5..a131f8233b0d 100644 --- a/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala +++ b/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala @@ -24,7 +24,9 @@ import scala.jdk.CollectionConverters._ import org.apache.hadoop.fs.Path -import org.apache.spark.{SparkException, SparkRuntimeException, SparkUnsupportedOperationException, TaskNotSerializableException} +import org.apache.spark.{SparkException, SparkIllegalArgumentException, SparkRuntimeException, SparkUnsupportedOperationException, TaskNotSerializableException} +import org.apache.spark.internal.config.IO_COMPRESSION_CODEC +import org.apache.spark.io.CompressionCodec.FALLBACK_COMPRESSION_CODEC import org.apache.spark.memory.SparkOutOfMemoryError import org.apache.spark.scheduler.{BarrierJobRunWithDynamicAllocationException, BarrierJobSlotsNumberCheckFailed, BarrierJobUnsupportedRDDChainException} import org.apache.spark.shuffle.{FetchFailedException, ShuffleManager} @@ -490,6 +492,15 @@ private[spark] object SparkCoreErrors { cause = null) } + def codecNotAvailableError(codecName: String): Throwable = { + new SparkIllegalArgumentException( + errorClass = "CODEC_NOT_AVAILABLE.WITH_CONF_SUGGESTION", + messageParameters = Map( + "codecName" -> codecName, + "configKey" -> toConf(IO_COMPRESSION_CODEC.key), + "configVal" -> toConfVal(FALLBACK_COMPRESSION_CODEC))) + } + private def quoteByDefault(elem: String): String = { "\"" + elem + "\"" } diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index cc96437a1d21..07e694b6c5b0 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -28,7 +28,7 @@ import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream} import org.apache.spark.{SparkConf, SparkIllegalArgumentException} import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.errors.SparkCoreErrors.{toConf, toConfVal} +import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.internal.config._ import org.apache.spark.util.Utils @@ -92,12 +92,7 @@ private[spark] object CompressionCodec { } catch { case _: ClassNotFoundException | _: IllegalArgumentException => None } - codec.getOrElse(throw new SparkIllegalArgumentException( - errorClass = "CODEC_NOT_AVAILABLE", - messageParameters = Map( - "codecName" -> codecName, - "configKey" -> toConf(IO_COMPRESSION_CODEC.key), - "configVal" -> toConfVal(FALLBACK_COMPRESSION_CODEC)))) + codec.getOrElse(throw SparkCoreErrors.codecNotAvailableError(codecName)) } /** diff --git a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala index 9c9fac0d4833..729fcecff120 100644 --- a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala @@ -131,7 +131,7 @@ class CompressionCodecSuite extends SparkFunSuite { exception = intercept[SparkIllegalArgumentException] { CompressionCodec.createCodec(conf, "foobar") }, - errorClass = "CODEC_NOT_AVAILABLE", + errorClass = "CODEC_NOT_AVAILABLE.WITH_CONF_SUGGESTION", parameters = Map( "codecName" -> "foobar", "configKey" -> "\"spark.io.compression.codec\"", diff --git a/docs/sql-error-conditions-codec-not-available-error-class.md b/docs/sql-error-conditions-codec-not-available-error-class.md new file mode 100644 index 000000000000..bb93f56206ba --- /dev/null +++ b/docs/sql-error-conditions-codec-not-available-error-class.md @@ -0,0 +1,41 @@ +--- +layout: global +title: CODEC_NOT_AVAILABLE error class +displayTitle: CODEC_NOT_AVAILABLE error class +license: | + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--- + +<!-- + DO NOT EDIT THIS FILE. + It was generated automatically by `org.apache.spark.SparkThrowableSuite`. +--> + +SQLSTATE: 56038 + +The codec `<codecName>` is not available. + +This error class has the following derived error classes: + +## WITH_AVAILABLE_CODECS_SUGGESTION + +Available codecs are `<availableCodecs>`. + +## WITH_CONF_SUGGESTION + +Consider to set the config `<configKey>` to `<configVal>`. + + diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index c704b1c10c46..359d8645ef20 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -372,11 +372,13 @@ If this problem persists, you may consider using `rdd.checkpoint()` instead, whi `MapObjects` does not support the class `<cls>` as resulting collection. -### CODEC_NOT_AVAILABLE +### [CODEC_NOT_AVAILABLE](sql-error-conditions-codec-not-available-error-class.html) SQLSTATE: 56038 -The codec `<codecName>` is not available. Consider to set the config `<configKey>` to `<configVal>`. +The codec `<codecName>` is not available. + +For more details see [CODEC_NOT_AVAILABLE](sql-error-conditions-codec-not-available-error-class.html) ### CODEC_SHORT_NAME_NOT_FOUND diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CompressionCodecs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CompressionCodecs.scala index a1d6446cc105..860e73a84598 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CompressionCodecs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CompressionCodecs.scala @@ -22,6 +22,7 @@ import java.util.Locale import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.SequenceFile.CompressionType +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.util.Utils object CompressionCodecs { @@ -42,9 +43,9 @@ object CompressionCodecs { } codecName } catch { - case e: ClassNotFoundException => - throw new IllegalArgumentException(s"Codec [$codecName] " + - s"is not available. Known codecs are ${shortCompressionCodecNames.keys.mkString(", ")}.") + case _: ClassNotFoundException => + throw QueryExecutionErrors.codecNotAvailableError( + codecName, shortCompressionCodecNames.keys.mkString(", ")) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index af5cafdc8a3a..b168347eab20 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -2711,4 +2711,12 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE errorClass = "INVALID_WRITER_COMMIT_MESSAGE", messageParameters = Map("details" -> details)) } + + def codecNotAvailableError(codecName: String, availableCodecs: String): Throwable = { + new SparkIllegalArgumentException( + errorClass = "CODEC_NOT_AVAILABLE.WITH_AVAILABLE_CODECS_SUGGESTION", + messageParameters = Map( + "codecName" -> codecName, + "availableCodecs" -> availableCodecs)) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala index 4bed600fa4eb..4127fc547c5f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala @@ -23,6 +23,7 @@ import org.apache.orc.OrcConf.COMPRESS import org.apache.spark.sql.catalyst.{DataSourceOptions, FileSourceOptions} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.SQLConf /** @@ -53,8 +54,7 @@ class OrcOptions( .toLowerCase(Locale.ROOT) if (!shortOrcCompressionCodecNames.contains(codecName)) { val availableCodecs = shortOrcCompressionCodecNames.keys.map(_.toLowerCase(Locale.ROOT)) - throw new IllegalArgumentException(s"Codec [$codecName] " + - s"is not available. Available codecs are ${availableCodecs.mkString(", ")}.") + throw QueryExecutionErrors.codecNotAvailableError(codecName, availableCodecs.mkString(", ")) } shortOrcCompressionCodecNames(codecName) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala index 1f022f8d369e..e795d156d764 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala @@ -23,6 +23,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.spark.sql.catalyst.{DataSourceOptions, FileSourceOptions} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.SQLConf /** @@ -55,8 +56,8 @@ class ParquetOptions( if (!shortParquetCompressionCodecNames.contains(codecName)) { val availableCodecs = shortParquetCompressionCodecNames.keys.map(_.toLowerCase(Locale.ROOT)) - throw new IllegalArgumentException(s"Codec [$codecName] " + - s"is not available. Available codecs are ${availableCodecs.mkString(", ")}.") + throw QueryExecutionErrors.codecNotAvailableError( + codecName, availableCodecs.mkString(", ")) } shortParquetCompressionCodecNames(codecName).name() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala index 040c127a43d1..28644720d043 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala @@ -24,6 +24,7 @@ import scala.jdk.CollectionConverters._ import org.apache.hadoop.fs.Path import org.apache.parquet.hadoop.ParquetOutputFormat +import org.apache.spark.SparkIllegalArgumentException import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -120,10 +121,15 @@ class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSpar test("Create table with unknown compression") { Seq(true, false).foreach { isPartitioned => - val exception = intercept[IllegalArgumentException] { - checkCompressionCodec("aa", isPartitioned) - } - assert(exception.getMessage.contains("Codec [aa] is not available")) + checkError( + exception = intercept[SparkIllegalArgumentException] { + checkCompressionCodec("aa", isPartitioned) + }, + errorClass = "CODEC_NOT_AVAILABLE.WITH_AVAILABLE_CODECS_SUGGESTION", + parameters = Map( + "codecName" -> "aa", + "availableCodecs" -> ("brotli, uncompressed, lzo, snappy, " + + "lz4_raw, none, zstd, lz4, gzip"))) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index 6e3210f8c177..25aa6def052b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -24,7 +24,7 @@ import java.nio.file.Files import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.GzipCodec -import org.apache.spark.{SparkConf, TestUtils} +import org.apache.spark.{SparkConf, SparkIllegalArgumentException, TestUtils} import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.util.HadoopCompressionCodec.{BZIP2, DEFLATE, GZIP, NONE} import org.apache.spark.sql.execution.datasources.CommonFileDataSourceSuite @@ -105,12 +105,18 @@ abstract class TextSuite extends QueryTest with SharedSparkSession with CommonFi verifyFrame(spark.read.text(tempDirPath)) } - val errMsg = intercept[IllegalArgumentException] { - val tempDirPath = Utils.createTempDir().getAbsolutePath - testDf.write.option("compression", "illegal").mode(SaveMode.Overwrite).text(tempDirPath) + withTempDir { dir => + checkError( + exception = intercept[SparkIllegalArgumentException] { + testDf.write.option("compression", "illegal").mode( + SaveMode.Overwrite).text(dir.getAbsolutePath) + }, + errorClass = "CODEC_NOT_AVAILABLE.WITH_AVAILABLE_CODECS_SUGGESTION", + parameters = Map( + "codecName" -> "illegal", + "availableCodecs" -> "bzip2, deflate, uncompressed, snappy, none, lz4, gzip") + ) } - assert(errMsg.getMessage.contains("Codec [illegal] is not available. " + - "Known codecs are")) } test("SPARK-13543 Write the output as uncompressed via option()") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org