This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b0f92a9261a3 [SPARK-49002][SQL] Consistently handle invalid locations in WAREHOUSE/SCHEMA/TABLE/PARTITION/DIRECTORY b0f92a9261a3 is described below commit b0f92a9261a34defa016361b1321f634f0516347 Author: Kent Yao <y...@apache.org> AuthorDate: Thu Aug 1 23:29:35 2024 +0800 [SPARK-49002][SQL] Consistently handle invalid locations in WAREHOUSE/SCHEMA/TABLE/PARTITION/DIRECTORY ### What changes were proposed in this pull request? We are now consistently handling invalid location/path values for all database objects in this pull request. Before this PR, we only checked for `null` and `""` for a small group of operations, such as `SetNamespaceLocation` and `CreateNamespace`. However, various other commands or queries involved with location did not undergo verification. Besides, we also didn't apply suitable error classes for other syntax errors like `null` and `""`. In this PR, we add a try-catch block to rethrow INVALID_LOCATION errors for `null`, `""` and all other invalid inputs. And all operations for databases, tables, partitions, raw paths are validated. ### Why are the changes needed? For better and consistent path errors ### Does this PR introduce _any_ user-facing change? Yes, IllegalArgumentException thrown by path parsing is replaced with INVALID_LOCATION error ### How was this patch tested? new tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #47485 from yaooqinn/SPARK-49002. Authored-by: Kent Yao <y...@apache.org> Signed-off-by: Kent Yao <y...@apache.org> --- .../src/main/resources/error/error-conditions.json | 6 ++ .../catalyst/catalog/ExternalCatalogUtils.scala | 13 +++- .../spark/sql/errors/QueryExecutionErrors.scala | 10 ++- .../catalyst/analysis/ResolveSessionCatalog.scala | 10 +-- .../spark/sql/execution/datasources/rules.scala | 14 ++-- .../datasources/v2/DataSourceV2Strategy.scala | 9 --- .../analyzer-results/sql-on-files.sql.out | 80 ++++++++++++++++++++ .../resources/sql-tests/inputs/sql-on-files.sql | 5 ++ .../sql-tests/results/sql-on-files.sql.out | 88 ++++++++++++++++++++++ .../AlterNamespaceSetLocationSuiteBase.scala | 15 ++++ 10 files changed, 220 insertions(+), 30 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index de127d4a7bf0..d4bcd854a507 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -2503,6 +2503,12 @@ }, "sqlState" : "42K0E" }, + "INVALID_LOCATION" : { + "message" : [ + "The location name cannot be an invalid URI, but `<location>` was given." + ], + "sqlState" : "42K05" + }, "INVALID_NON_DETERMINISTIC_EXPRESSIONS" : { "message" : [ "The operator expects a deterministic expression, but the actual expression is <sqlExprs>." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala index 749c9df40f14..4dfd4cf7ca4f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.catalog import java.net.URI +import org.apache.commons.lang3.StringUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.util.Shell @@ -28,7 +29,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BasePredicate, BoundReference, Expression, Predicate} import org.apache.spark.sql.catalyst.expressions.Hex.unhexDigits import org.apache.spark.sql.catalyst.util.CharVarcharUtils -import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType @@ -280,7 +281,15 @@ object CatalogUtils { * @return the URI of the path */ def stringToURI(str: String): URI = { - new Path(str).toUri + if (StringUtils.isEmpty(str)) { + throw QueryExecutionErrors.invalidLocationError(str, "INVALID_EMPTY_LOCATION") + } + try { + new Path(str).toUri + } catch { + case e: IllegalArgumentException => + throw QueryExecutionErrors.invalidLocationError(str, "INVALID_LOCATION", e) + } } def makeQualifiedDBObjectPath( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index bdd53219de40..7c44863e6956 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -2522,10 +2522,14 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE ) } - def invalidEmptyLocationError(location: String): SparkIllegalArgumentException = { + def invalidLocationError( + location: String, + errorClass: String, + cause: Throwable = null): SparkIllegalArgumentException = { new SparkIllegalArgumentException( - errorClass = "INVALID_EMPTY_LOCATION", - messageParameters = Map("location" -> location)) + errorClass = errorClass, + messageParameters = Map("location" -> location), + cause = cause) } def malformedProtobufMessageDetectedInMessageParsingError(e: Throwable): Throwable = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 1f9419c41b74..a460634ad8a8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.catalyst.analysis -import org.apache.commons.lang3.StringUtils - import org.apache.spark.SparkException import org.apache.spark.internal.LogKeys.CONFIG import org.apache.spark.internal.MDC @@ -32,7 +30,7 @@ import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, toPrettySQL, ResolveDe import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, LookupCatalog, SupportsNamespaces, V1Table} import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils @@ -134,9 +132,6 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) AlterDatabasePropertiesCommand(db, properties) case SetNamespaceLocation(DatabaseInSessionCatalog(db), location) if conf.useV1Command => - if (StringUtils.isEmpty(location)) { - throw QueryExecutionErrors.invalidEmptyLocationError(location) - } AlterDatabaseSetLocationCommand(db, location) case RenameTable(ResolvedV1TableOrViewIdentifier(oldIdent), newName, isView) => @@ -240,9 +235,6 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) val comment = c.properties.get(SupportsNamespaces.PROP_COMMENT) val location = c.properties.get(SupportsNamespaces.PROP_LOCATION) val newProperties = c.properties -- CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES - if (location.isDefined && location.get.isEmpty) { - throw QueryExecutionErrors.invalidEmptyLocationError(location.get) - } CreateDatabaseCommand(name, c.ifNotExists, location, comment, newProperties) case d @ DropNamespace(DatabaseInSessionCatalog(db), _, _) if conf.useV1Command => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 9279862c9196..f9fe5390e16b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -22,6 +22,7 @@ import java.util.Locale import scala.collection.mutable.{HashMap, HashSet} import scala.jdk.CollectionConverters._ +import org.apache.spark.SparkIllegalArgumentException import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog._ @@ -55,7 +56,7 @@ class ResolveSQLOnFile(sparkSession: SparkSession) extends Rule[LogicalPlan] { val ident = unresolved.multipartIdentifier val dataSource = DataSource( sparkSession, - paths = Seq(ident.last), + paths = Seq(CatalogUtils.stringToURI(ident.last).toString), className = ident.head, options = unresolved.options.asScala.toMap) // `dataSource.providingClass` may throw ClassNotFoundException, the caller side will try-catch @@ -67,12 +68,6 @@ class ResolveSQLOnFile(sparkSession: SparkSession) extends Rule[LogicalPlan] { errorClass = "UNSUPPORTED_DATASOURCE_FOR_DIRECT_QUERY", messageParameters = Map("dataSourceType" -> ident.head)) } - if (isFileFormat && ident.last.isEmpty) { - unresolved.failAnalysis( - errorClass = "INVALID_EMPTY_LOCATION", - messageParameters = Map("location" -> ident.last)) - } - dataSource } @@ -95,6 +90,11 @@ class ResolveSQLOnFile(sparkSession: SparkSession) extends Rule[LogicalPlan] { LogicalRelation(ds.resolveRelation()) } catch { case _: ClassNotFoundException => u + case e: SparkIllegalArgumentException if e.getErrorClass != null => + u.failAnalysis( + errorClass = e.getErrorClass, + messageParameters = e.getMessageParameters.asScala.toMap, + cause = e) case e: Exception if !e.isInstanceOf[AnalysisException] => // the provider is valid, but failed to create a logical plan u.failAnalysis( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 7a668b75c3c7..4019357bc3a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.collection.mutable -import org.apache.commons.lang3.StringUtils - import org.apache.spark.SparkException import org.apache.spark.internal.{Logging, MDC} import org.apache.spark.internal.LogKeys.EXPR @@ -374,9 +372,6 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat AlterNamespaceSetPropertiesExec(catalog.asNamespaceCatalog, ns, properties) :: Nil case SetNamespaceLocation(ResolvedNamespace(catalog, ns, _), location) => - if (StringUtils.isEmpty(location)) { - throw QueryExecutionErrors.invalidEmptyLocationError(location) - } AlterNamespaceSetPropertiesExec( catalog.asNamespaceCatalog, ns, @@ -389,10 +384,6 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat Map(SupportsNamespaces.PROP_COMMENT -> comment)) :: Nil case CreateNamespace(ResolvedNamespace(catalog, ns, _), ifNotExists, properties) => - val location = properties.get(SupportsNamespaces.PROP_LOCATION) - if (location.isDefined && location.get.isEmpty) { - throw QueryExecutionErrors.invalidEmptyLocationError(location.get) - } val finalProperties = properties.get(SupportsNamespaces.PROP_LOCATION).map { loc => properties + (SupportsNamespaces.PROP_LOCATION -> makeQualifiedDBObjectPath(loc)) }.getOrElse(properties) diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/sql-on-files.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/sql-on-files.sql.out index b098a9758fe4..2899b6c1b0be 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/sql-on-files.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/sql-on-files.sql.out @@ -34,6 +34,26 @@ org.apache.spark.sql.AnalysisException } +-- !query +SELECT * FROM parquet.`file:tmp` +-- !query analysis +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "INVALID_LOCATION", + "sqlState" : "42K05", + "messageParameters" : { + "location" : "file:tmp" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 15, + "stopIndex" : 32, + "fragment" : "parquet.`file:tmp`" + } ] +} + + -- !query SELECT * FROM parquet.`/file/not/found` -- !query analysis @@ -89,6 +109,26 @@ org.apache.spark.sql.AnalysisException } +-- !query +SELECT * FROM orc.`file:tmp` +-- !query analysis +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "INVALID_LOCATION", + "sqlState" : "42K05", + "messageParameters" : { + "location" : "file:tmp" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 15, + "stopIndex" : 28, + "fragment" : "orc.`file:tmp`" + } ] +} + + -- !query SELECT * FROM orc.`/file/not/found` -- !query analysis @@ -144,6 +184,26 @@ org.apache.spark.sql.AnalysisException } +-- !query +SELECT * FROM csv.`file:tmp` +-- !query analysis +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "INVALID_LOCATION", + "sqlState" : "42K05", + "messageParameters" : { + "location" : "file:tmp" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 15, + "stopIndex" : 28, + "fragment" : "csv.`file:tmp`" + } ] +} + + -- !query SELECT * FROM csv.`/file/not/found` -- !query analysis @@ -199,6 +259,26 @@ org.apache.spark.sql.AnalysisException } +-- !query +SELECT * FROM json.`file:tmp` +-- !query analysis +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "INVALID_LOCATION", + "sqlState" : "42K05", + "messageParameters" : { + "location" : "file:tmp" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 15, + "stopIndex" : 29, + "fragment" : "json.`file:tmp`" + } ] +} + + -- !query SELECT * FROM json.`/file/not/found` -- !query analysis diff --git a/sql/core/src/test/resources/sql-tests/inputs/sql-on-files.sql b/sql/core/src/test/resources/sql-tests/inputs/sql-on-files.sql index 8a00e4400e6b..2b45a5060c99 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/sql-on-files.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/sql-on-files.sql @@ -2,6 +2,8 @@ CREATE DATABASE IF NOT EXISTS sql_on_files; -- Parquet CREATE TABLE sql_on_files.test_parquet USING PARQUET AS SELECT 1; SELECT * FROM parquet.``; +SELECT * FROM parquet.`file:tmp`; + SELECT * FROM parquet.`/file/not/found`; SELECT * FROM parquet.`${spark.sql.warehouse.dir}/sql_on_files.db/test_parquet`; DROP TABLE sql_on_files.test_parquet; @@ -9,6 +11,7 @@ DROP TABLE sql_on_files.test_parquet; -- ORC CREATE TABLE sql_on_files.test_orc USING ORC AS SELECT 1; SELECT * FROM orc.``; +SELECT * FROM orc.`file:tmp`; SELECT * FROM orc.`/file/not/found`; SELECT * FROM orc.`${spark.sql.warehouse.dir}/sql_on_files.db/test_orc`; DROP TABLE sql_on_files.test_orc; @@ -16,6 +19,7 @@ DROP TABLE sql_on_files.test_orc; -- CSV CREATE TABLE sql_on_files.test_csv USING CSV AS SELECT 1; SELECT * FROM csv.``; +SELECT * FROM csv.`file:tmp`; SELECT * FROM csv.`/file/not/found`; SELECT * FROM csv.`${spark.sql.warehouse.dir}/sql_on_files.db/test_csv`; DROP TABLE sql_on_files.test_csv; @@ -23,6 +27,7 @@ DROP TABLE sql_on_files.test_csv; -- JSON CREATE TABLE sql_on_files.test_json USING JSON AS SELECT 1; SELECT * FROM json.``; +SELECT * FROM json.`file:tmp`; SELECT * FROM json.`/file/not/found`; SELECT * FROM json.`${spark.sql.warehouse.dir}/sql_on_files.db/test_json`; DROP TABLE sql_on_files.test_json; diff --git a/sql/core/src/test/resources/sql-tests/results/sql-on-files.sql.out b/sql/core/src/test/resources/sql-tests/results/sql-on-files.sql.out index fc8f44bc22fe..73eef8fe74f2 100644 --- a/sql/core/src/test/resources/sql-tests/results/sql-on-files.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/sql-on-files.sql.out @@ -37,6 +37,28 @@ org.apache.spark.sql.AnalysisException } +-- !query +SELECT * FROM parquet.`file:tmp` +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "INVALID_LOCATION", + "sqlState" : "42K05", + "messageParameters" : { + "location" : "file:tmp" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 15, + "stopIndex" : 32, + "fragment" : "parquet.`file:tmp`" + } ] +} + + -- !query SELECT * FROM parquet.`/file/not/found` -- !query schema @@ -98,6 +120,28 @@ org.apache.spark.sql.AnalysisException } +-- !query +SELECT * FROM orc.`file:tmp` +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "INVALID_LOCATION", + "sqlState" : "42K05", + "messageParameters" : { + "location" : "file:tmp" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 15, + "stopIndex" : 28, + "fragment" : "orc.`file:tmp`" + } ] +} + + -- !query SELECT * FROM orc.`/file/not/found` -- !query schema @@ -159,6 +203,28 @@ org.apache.spark.sql.AnalysisException } +-- !query +SELECT * FROM csv.`file:tmp` +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "INVALID_LOCATION", + "sqlState" : "42K05", + "messageParameters" : { + "location" : "file:tmp" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 15, + "stopIndex" : 28, + "fragment" : "csv.`file:tmp`" + } ] +} + + -- !query SELECT * FROM csv.`/file/not/found` -- !query schema @@ -220,6 +286,28 @@ org.apache.spark.sql.AnalysisException } +-- !query +SELECT * FROM json.`file:tmp` +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "INVALID_LOCATION", + "sqlState" : "42K05", + "messageParameters" : { + "location" : "file:tmp" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 15, + "stopIndex" : 29, + "fragment" : "json.`file:tmp`" + } ] +} + + -- !query SELECT * FROM json.`/file/not/found` -- !query schema diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterNamespaceSetLocationSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterNamespaceSetLocationSuiteBase.scala index 6427338a6c52..2d4277e5499e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterNamespaceSetLocationSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterNamespaceSetLocationSuiteBase.scala @@ -56,6 +56,21 @@ trait AlterNamespaceSetLocationSuiteBase extends QueryTest with DDLCommandTestUt } } + test("Invalid location string") { + val ns = s"$catalog.$namespace" + withNamespace(ns) { + sql(s"CREATE NAMESPACE $ns") + val sqlText = s"ALTER NAMESPACE $ns SET LOCATION 'file:tmp'" + val e = intercept[SparkIllegalArgumentException] { + sql(sqlText) + } + checkError( + exception = e, + errorClass = "INVALID_LOCATION", + parameters = Map("location" -> "file:tmp")) + } + } + test("Namespace does not exist") { val ns = "not_exist" val e = intercept[AnalysisException] { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org