This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new c6dccc7dd41 [SPARK-39163][SQL] Throw an exception w/ error class for an invalid bucket file c6dccc7dd41 is described below commit c6dccc7dd412a95007f5bb2584d69b85ff9ebf8e Author: panbingkun <pbk1...@gmail.com> AuthorDate: Thu May 19 20:39:35 2022 +0300 [SPARK-39163][SQL] Throw an exception w/ error class for an invalid bucket file ### What changes were proposed in this pull request? In the PR, I propose to use the INVALID_BUCKET_FILE error classes for an invalid bucket file. ### Why are the changes needed? Porting the executing errors for multiple rows from a subquery used as an expression to the new error framework should improve user experience with Spark SQL. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes #36603 from panbingkun/SPARK-39163. Authored-by: panbingkun <pbk1...@gmail.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- core/src/main/resources/error/error-classes.json | 3 +++ .../spark/sql/errors/QueryExecutionErrors.scala | 5 ++++ .../spark/sql/execution/DataSourceScanExec.scala | 4 ++-- .../sql/errors/QueryExecutionErrorsSuite.scala | 28 ++++++++++++++++++++-- .../adaptive/AdaptiveQueryExecSuite.scala | 6 ++--- .../spark/sql/sources/BucketedReadSuite.scala | 23 ------------------ 6 files changed, 38 insertions(+), 31 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index e4ee09ea8a7..1a139c018e8 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -115,6 +115,9 @@ "INVALID_ARRAY_INDEX_IN_ELEMENT_AT" : { "message" : [ "The index <indexValue> is out of bounds. The array has <arraySize> elements. To return NULL instead, use `try_element_at`. If necessary set <config> to \"false\" to bypass this error." ] }, + "INVALID_BUCKET_FILE" : { + "message" : [ "Invalid bucket file: <path>" ] + }, "INVALID_FIELD_NAME" : { "message" : [ "Field name <fieldName> is invalid: <path> is not a struct." ], "sqlState" : "42000" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index a155b0694b5..1e664100545 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -2000,4 +2000,9 @@ object QueryExecutionErrors extends QueryErrorsBase { s"add ${toSQLValue(amount, IntegerType)} $unit to " + s"${toSQLValue(DateTimeUtils.microsToInstant(micros), TimestampType)}")) } + + def invalidBucketFile(path: String): Throwable = { + new SparkException(errorClass = "INVALID_BUCKET_FILE", messageParameters = Array(path), + cause = null) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index f7b627cef08..f5d349d975f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.util.{truncatedString, CaseInsensitiveMap} +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource} import org.apache.spark.sql.execution.datasources.v2.PushedDownOperators @@ -618,8 +619,7 @@ case class FileSourceScanExec( }.groupBy { f => BucketingUtils .getBucketId(new Path(f.filePath).getName) - // TODO(SPARK-39163): Throw an exception w/ error class for an invalid bucket file - .getOrElse(throw new IllegalStateException(s"Invalid bucket file ${f.filePath}")) + .getOrElse(throw QueryExecutionErrors.invalidBucketFile(f.filePath)) } val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala index bdc0772c1de..bbf6c0dda79 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala @@ -17,8 +17,8 @@ package org.apache.spark.sql.errors -import java.io.IOException -import java.net.URL +import java.io.{File, IOException} +import java.net.{URI, URL} import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet, ResultSetMetaData} import java.util.{Locale, Properties, ServiceConfigurationError} @@ -587,6 +587,30 @@ class QueryExecutionErrorsSuite JdbcDialects.unregisterDialect(testH2DialectUnrecognizedSQLType) } + + test("INVALID_BUCKET_FILE: error if there exists any malformed bucket files") { + val df1 = (0 until 50).map(i => (i % 5, i % 13, i.toString)). + toDF("i", "j", "k").as("df1") + + withTable("bucketed_table") { + df1.write.format("parquet").bucketBy(8, "i"). + saveAsTable("bucketed_table") + val warehouseFilePath = new URI(spark.sessionState.conf.warehousePath).getPath + val tableDir = new File(warehouseFilePath, "bucketed_table") + Utils.deleteRecursively(tableDir) + df1.write.parquet(tableDir.getAbsolutePath) + + val aggregated = spark.table("bucketed_table").groupBy("i").count() + + checkErrorClass( + exception = intercept[SparkException] { + aggregated.count() + }, + errorClass = "INVALID_BUCKET_FILE", + msg = "Invalid bucket file: .+", + matchMsg = true) + } + } } class FakeFileSystemSetPermission extends LocalFileSystem { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 0f71c028962..51d02f4a7c6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -860,10 +860,8 @@ class AdaptiveQueryExecSuite val error = intercept[SparkException] { aggregated.count() } - // TODO(SPARK-39163): Throw an exception w/ error class for an invalid bucket file - assert(error.getErrorClass === "INTERNAL_ERROR") - assert(error.getCause.toString contains "Invalid bucket file") - assert(error.getCause.getSuppressed.size === 0) + assert(error.getErrorClass === "INVALID_BUCKET_FILE") + assert(error.getMessage contains "Invalid bucket file") } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index c39edbc5860..fc7c4e5761b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -17,12 +17,8 @@ package org.apache.spark.sql.sources -import java.io.File -import java.net.URI - import scala.util.Random -import org.apache.spark.SparkException import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions @@ -37,7 +33,6 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} -import org.apache.spark.util.Utils import org.apache.spark.util.collection.BitSet class BucketedReadWithoutHiveSupportSuite @@ -833,24 +828,6 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti } } - test("error if there exists any malformed bucket files") { - withTable("bucketed_table") { - df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table") - val warehouseFilePath = new URI(spark.sessionState.conf.warehousePath).getPath - val tableDir = new File(warehouseFilePath, "bucketed_table") - Utils.deleteRecursively(tableDir) - df1.write.parquet(tableDir.getAbsolutePath) - - val aggregated = spark.table("bucketed_table").groupBy("i").count() - val e = intercept[SparkException] { - aggregated.count() - } - // TODO(SPARK-39163): Throw an exception w/ error class for an invalid bucket file - assert(e.getErrorClass === "INTERNAL_ERROR") - assert(e.getCause.toString contains "Invalid bucket file") - } - } - test("disable bucketing when the output doesn't contain all bucketing columns") { withTable("bucketed_table") { df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org