This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 5d3f3365c0b [SPARK-39163][SQL][3.3] Throw an exception w/ error class for an invalid bucket file 5d3f3365c0b is described below commit 5d3f3365c0b7d4515fd97d0ff7b7b29db69b2faf Author: panbingkun <pbk1...@gmail.com> AuthorDate: Mon Jun 20 11:04:14 2022 +0900 [SPARK-39163][SQL][3.3] Throw an exception w/ error class for an invalid bucket file ### What changes were proposed in this pull request? In the PR, I propose to use the INVALID_BUCKET_FILE error classes for an invalid bucket file. This is a backport of https://github.com/apache/spark/pull/36603. ### Why are the changes needed? Porting the executing errors for multiple rows from a subquery used as an expression to the new error framework should improve user experience with Spark SQL. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes #36913 from panbingkun/branch-3.3-SPARK-39163. Authored-by: panbingkun <pbk1...@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- core/src/main/resources/error/error-classes.json | 3 +++ .../spark/sql/errors/QueryExecutionErrors.scala | 5 +++++ .../spark/sql/execution/DataSourceScanExec.scala | 4 ++-- .../sql/errors/QueryExecutionErrorsSuite.scala | 25 ++++++++++++++++++++++ .../adaptive/AdaptiveQueryExecSuite.scala | 8 +++---- .../spark/sql/sources/BucketedReadSuite.scala | 21 ------------------ 6 files changed, 39 insertions(+), 27 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 4eb3a4e8e1e..fc712fc9c52 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -95,6 +95,9 @@ "INVALID_ARRAY_INDEX_IN_ELEMENT_AT" : { "message" : [ "The index <indexValue> is out of bounds. The array has <arraySize> elements. Use `try_element_at` to tolerate accessing element at invalid index and return NULL instead. If necessary set <config> to \"false\" to bypass this error." ] }, + "INVALID_BUCKET_FILE" : { + "message" : [ "Invalid bucket file: <path>" ] + }, "INVALID_FIELD_NAME" : { "message" : [ "Field name <fieldName> is invalid: <path> is not a struct." ], "sqlState" : "42000" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 6c6139d2ccc..161bfd3c03d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -2075,4 +2075,9 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { new SparkException(errorClass = "NULL_COMPARISON_RESULT", messageParameters = Array(), cause = null) } + + def invalidBucketFile(path: String): Throwable = { + new SparkException(errorClass = "INVALID_BUCKET_FILE", messageParameters = Array(path), + cause = null) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 1ec93a614b7..9e8ae9a714d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.util.truncatedString +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource} import org.apache.spark.sql.execution.datasources.v2.PushedDownOperators @@ -592,8 +593,7 @@ case class FileSourceScanExec( }.groupBy { f => BucketingUtils .getBucketId(new Path(f.filePath).getName) - // TODO(SPARK-39163): Throw an exception w/ error class for an invalid bucket file - .getOrElse(throw new IllegalStateException(s"Invalid bucket file ${f.filePath}")) + .getOrElse(throw QueryExecutionErrors.invalidBucketFile(f.filePath)) } val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala index 73c5b12849a..21acea53ed0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.errors +import java.io.File +import java.net.URI + import org.apache.spark.{SparkArithmeticException, SparkException, SparkIllegalArgumentException, SparkRuntimeException, SparkUnsupportedOperationException, SparkUpgradeException} import org.apache.spark.sql.{DataFrame, QueryTest, SaveMode} import org.apache.spark.sql.execution.datasources.orc.OrcTest @@ -286,4 +289,26 @@ class QueryExecutionErrorsSuite extends QueryTest assert(e2.getMessage === "The save mode NULL is not supported for: an existent path.") } } + + test("INVALID_BUCKET_FILE: error if there exists any malformed bucket files") { + val df1 = (0 until 50).map(i => (i % 5, i % 13, i.toString)). + toDF("i", "j", "k").as("df1") + + withTable("bucketed_table") { + df1.write.format("parquet").bucketBy(8, "i"). + saveAsTable("bucketed_table") + val warehouseFilePath = new URI(spark.sessionState.conf.warehousePath).getPath + val tableDir = new File(warehouseFilePath, "bucketed_table") + Utils.deleteRecursively(tableDir) + df1.write.parquet(tableDir.getAbsolutePath) + + val aggregated = spark.table("bucketed_table").groupBy("i").count() + + val e = intercept[SparkException] { + aggregated.count() + } + assert(e.getErrorClass === "INVALID_BUCKET_FILE") + assert(e.getMessage.matches("Invalid bucket file: .+")) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index f068ab8a4e2..831a998dfaa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -23,6 +23,7 @@ import java.net.URI import org.apache.logging.log4j.Level import org.scalatest.PrivateMethodTester +import org.apache.spark.SparkException import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart} import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession, Strategy} import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} @@ -856,12 +857,11 @@ class AdaptiveQueryExecSuite df1.write.parquet(tableDir.getAbsolutePath) val aggregated = spark.table("bucketed_table").groupBy("i").count() - val error = intercept[IllegalStateException] { + val error = intercept[SparkException] { aggregated.count() } - // TODO(SPARK-39163): Throw an exception w/ error class for an invalid bucket file - assert(error.toString contains "Invalid bucket file") - assert(error.getSuppressed.size === 0) + assert(error.getErrorClass === "INVALID_BUCKET_FILE") + assert(error.getMessage contains "Invalid bucket file") } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 8d593a55a7e..bdd642a1f90 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -17,9 +17,6 @@ package org.apache.spark.sql.sources -import java.io.File -import java.net.URI - import scala.util.Random import org.apache.spark.sql._ @@ -36,7 +33,6 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} -import org.apache.spark.util.Utils import org.apache.spark.util.collection.BitSet class BucketedReadWithoutHiveSupportSuite @@ -832,23 +828,6 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti } } - test("error if there exists any malformed bucket files") { - withTable("bucketed_table") { - df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table") - val warehouseFilePath = new URI(spark.sessionState.conf.warehousePath).getPath - val tableDir = new File(warehouseFilePath, "bucketed_table") - Utils.deleteRecursively(tableDir) - df1.write.parquet(tableDir.getAbsolutePath) - - val aggregated = spark.table("bucketed_table").groupBy("i").count() - val e = intercept[IllegalStateException] { - aggregated.count() - } - // TODO(SPARK-39163): Throw an exception w/ error class for an invalid bucket file - assert(e.toString contains "Invalid bucket file") - } - } - test("disable bucketing when the output doesn't contain all bucketing columns") { withTable("bucketed_table") { df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org