This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b913e015e76 [SPARK-37942][CORE][SQL] Migrate error classes b913e015e76 is described below commit b913e015e76b60753d227a25ff6c4d49d77ed4ae Author: narek_karapetian <narek.karapetia...@yandex.ru> AuthorDate: Wed May 10 13:59:59 2023 +0300 [SPARK-37942][CORE][SQL] Migrate error classes ### What changes were proposed in this pull request? Rename error classes: 1. QueryCompilationErrors.cannotReadCorruptedTablePropertyError `_LEGACY_ERROR_TEMP_1091` -> `INSUFFICIENT_TABLE_PROPERTY` (with subclasses) + add test case + refactor CatalogTable.readLargeTableProp function 2. QueryCompilationErrors.notSupportedInJDBCCatalog `_LEGACY_ERROR_TEMP_1119` -> `NOT_SUPPORTED_IN_JDBC_CATALOG` (with subclasses) + add test case 3. QueryCompilationErrors.cannotSetJDBCNamespaceWithPropertyError + add test case 4. QueryCompilationErrors.alterTableSerDePropertiesNotSupportedForV2TablesError `_LEGACY_ERROR_TEMP_1124` -> `NOT_SUPPORTED_COMMAND_FOR_V2_TABLE` + add test case + updated test case in `AlterTableSetSerdeSuite` test class 5. QueryCompilationErrors.unsetNonExistentPropertyError rename to QueryCompilationErrors.unsetNonExistentPropertiesError + change error message and code to include all the nonexistent keys + add test case 6. QueryCompilationErrors.cannotUnsetJDBCNamespaceWithPropertyError `_LEGACY_ERROR_TEMP_1119` -> `NOT_SUPPORTED_IN_JDBC_CATALOG` + not able to reproduce this error class from tests ### Why are the changes needed? We should assign proper name to LEGACY_ERROR_TEMP* ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? By existing unit tests and an additional test cases were added. Closes #41018 from NarekDW/SPARK-37942-migrate-errors. Lead-authored-by: narek_karapetian <narek.karapetia...@yandex.ru> Co-authored-by: Narek Karapetian <narek.karapetia...@yandex.ru> Signed-off-by: Max Gekk <max.g...@gmail.com> --- core/src/main/resources/error/error-classes.json | 66 +++++++++++++------ .../spark/sql/catalyst/catalog/interface.scala | 27 ++++---- .../spark/sql/errors/QueryCompilationErrors.scala | 59 ++++++++++------- .../apache/spark/sql/execution/command/ddl.scala | 9 +-- .../datasources/v2/jdbc/JDBCTableCatalog.scala | 2 +- .../spark/sql/connector/DataSourceV2SQLSuite.scala | 24 ++++++- .../sql/errors/QueryCompilationErrorsSuite.scala | 75 ++++++++++++++++++++++ .../command/v2/AlterTableSetSerdeSuite.scala | 13 ++-- .../spark/sql/hive/MetastoreDataSourcesSuite.scala | 51 ++++++++++++--- .../spark/sql/hive/execution/HiveDDLSuite.scala | 8 ++- 10 files changed, 252 insertions(+), 82 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index dc97a735b39..269e2e4cb7b 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -789,6 +789,23 @@ ], "sqlState" : "21S01" }, + "INSUFFICIENT_TABLE_PROPERTY" : { + "message" : [ + "Can't find table property:" + ], + "subClass" : { + "MISSING_KEY" : { + "message" : [ + "<key>." + ] + }, + "MISSING_KEY_PART" : { + "message" : [ + "<key>, <totalAmountOfParts> parts are expected." + ] + } + } + }, "INTERNAL_ERROR" : { "message" : [ "<message>" @@ -1244,6 +1261,30 @@ }, "sqlState" : "42000" }, + "NOT_SUPPORTED_COMMAND_FOR_V2_TABLE" : { + "message" : [ + "<cmd> is not supported for v2 tables." + ], + "sqlState" : "46110" + }, + "NOT_SUPPORTED_IN_JDBC_CATALOG" : { + "message" : [ + "Not supported command in JDBC catalog:" + ], + "subClass" : { + "COMMAND" : { + "message" : [ + "<cmd>" + ] + }, + "COMMAND_WITH_PROPERTY" : { + "message" : [ + "<cmd> with property <property>." + ] + } + }, + "sqlState" : "46110" + }, "NO_HANDLER_FOR_UDAF" : { "message" : [ "No handler for UDAF '<functionName>'. Use sparkSession.udf.register(...) instead." @@ -1670,6 +1711,11 @@ ], "sqlState" : "42703" }, + "UNSET_NONEXISTENT_PROPERTIES" : { + "message" : [ + "Attempted to unset non-existent properties [<properties>] in table <table>." + ] + }, "UNSUPPORTED_ARROWTYPE" : { "message" : [ "Unsupported arrow type <typeName>." @@ -2667,11 +2713,6 @@ "Column statistics serialization is not supported for column <colName> of data type: <dataType>." ] }, - "_LEGACY_ERROR_TEMP_1091" : { - "message" : [ - "Cannot read table property '<key>' as it's corrupted.<details>." - ] - }, "_LEGACY_ERROR_TEMP_1097" : { "message" : [ "The field for corrupt records must be string type and nullable." @@ -2739,11 +2780,6 @@ "Sources support continuous: <continuousSources>." ] }, - "_LEGACY_ERROR_TEMP_1119" : { - "message" : [ - "<cmd> is not supported in JDBC catalog." - ] - }, "_LEGACY_ERROR_TEMP_1120" : { "message" : [ "Unsupported NamespaceChange <changes> in JDBC catalog." @@ -2764,11 +2800,6 @@ "Cannot rename a table with ALTER VIEW. Please use ALTER TABLE instead." ] }, - "_LEGACY_ERROR_TEMP_1124" : { - "message" : [ - "<cmd> is not supported for v2 tables." - ] - }, "_LEGACY_ERROR_TEMP_1125" : { "message" : [ "Database from v1 session catalog is not specified." @@ -3261,11 +3292,6 @@ "CREATE-TABLE-AS-SELECT cannot create table with location to a non-empty directory <tablePath>. To allow overwriting the existing non-empty directory, set '<config>' to true." ] }, - "_LEGACY_ERROR_TEMP_1244" : { - "message" : [ - "Attempted to unset non-existent property '<property>' in table '<table>'." - ] - }, "_LEGACY_ERROR_TEMP_1245" : { "message" : [ "ALTER TABLE CHANGE COLUMN is not supported for changing column '<originName>' with type '<originType>' to '<newName>' with type '<newType>'." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 8bbf1f4f564..fce820c4927 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -497,21 +497,20 @@ object CatalogTable { def readLargeTableProp(props: Map[String, String], key: String): Option[String] = { props.get(key).orElse { - if (props.filterKeys(_.startsWith(key)).isEmpty) { - None - } else { - val numParts = props.get(s"$key.numParts") - if (numParts.isEmpty) { - throw QueryCompilationErrors.cannotReadCorruptedTablePropertyError(key) - } else { - val parts = (0 until numParts.get.toInt).map { index => - props.getOrElse(s"$key.part.$index", { - throw QueryCompilationErrors.cannotReadCorruptedTablePropertyError( - key, s"Missing part $index, $numParts parts are expected.") - }) - } - Some(parts.mkString) + if (props.exists { case (mapKey, _) => mapKey.startsWith(key) }) { + props.get(s"$key.numParts") match { + case None => throw QueryCompilationErrors.insufficientTablePropertyError(key) + case Some(numParts) => + val parts = (0 until numParts.toInt).map { index => + val keyPart = s"$key.part.$index" + props.getOrElse(keyPart, { + throw QueryCompilationErrors.insufficientTablePropertyPartError(keyPart, numParts) + }) + } + Some(parts.mkString) } + } else { + None } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 1640c18a764..ad0a17ef4f4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -1033,10 +1033,19 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { messageParameters = Map("colName" -> colName, "dataType" -> dataType.toString)) } - def cannotReadCorruptedTablePropertyError(key: String, details: String = ""): Throwable = { + def insufficientTablePropertyError(key: String): Throwable = { new AnalysisException( - errorClass = "_LEGACY_ERROR_TEMP_1091", - messageParameters = Map("key" -> key, "details" -> details)) + errorClass = "INSUFFICIENT_TABLE_PROPERTY.MISSING_KEY", + messageParameters = Map("key" -> toSQLConf(key))) + } + + def insufficientTablePropertyPartError( + key: String, totalAmountOfParts: String): Throwable = { + new AnalysisException( + errorClass = "INSUFFICIENT_TABLE_PROPERTY.MISSING_KEY_PART", + messageParameters = Map( + "key" -> toSQLConf(key), + "totalAmountOfParts" -> totalAmountOfParts)) } def schemaFailToParseError(schema: String, e: Throwable): Throwable = { @@ -1269,8 +1278,16 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { private def notSupportedInJDBCCatalog(cmd: String): Throwable = { new AnalysisException( - errorClass = "_LEGACY_ERROR_TEMP_1119", - messageParameters = Map("cmd" -> cmd)) + errorClass = "NOT_SUPPORTED_IN_JDBC_CATALOG.COMMAND", + messageParameters = Map("cmd" -> toSQLStmt(cmd))) + } + + private def notSupportedInJDBCCatalog(cmd: String, property: String): Throwable = { + new AnalysisException( + errorClass = "NOT_SUPPORTED_IN_JDBC_CATALOG.COMMAND_WITH_PROPERTY", + messageParameters = Map( + "cmd" -> toSQLStmt(cmd), + "property" -> toSQLConf(property))) } def cannotCreateJDBCTableUsingProviderError(): Throwable = { @@ -1285,16 +1302,16 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { notSupportedInJDBCCatalog("CREATE NAMESPACE ... LOCATION ...") } - def cannotCreateJDBCNamespaceWithPropertyError(k: String): Throwable = { - notSupportedInJDBCCatalog(s"CREATE NAMESPACE with property $k") + def cannotCreateJDBCNamespaceWithPropertyError(property: String): Throwable = { + notSupportedInJDBCCatalog(s"CREATE NAMESPACE", property) } - def cannotSetJDBCNamespaceWithPropertyError(k: String): Throwable = { - notSupportedInJDBCCatalog(s"SET NAMESPACE with property $k") + def cannotSetJDBCNamespaceWithPropertyError(property: String): Throwable = { + notSupportedInJDBCCatalog("SET NAMESPACE", property) } - def cannotUnsetJDBCNamespaceWithPropertyError(k: String): Throwable = { - notSupportedInJDBCCatalog(s"Remove NAMESPACE property $k") + def cannotUnsetJDBCNamespaceWithPropertyError(property: String): Throwable = { + notSupportedInJDBCCatalog("UNSET NAMESPACE", property) } def unsupportedJDBCNamespaceChangeInCatalogError(changes: Seq[NamespaceChange]): Throwable = { @@ -1349,8 +1366,8 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { private def notSupportedForV2TablesError(cmd: String): Throwable = { new AnalysisException( - errorClass = "_LEGACY_ERROR_TEMP_1124", - messageParameters = Map("cmd" -> cmd)) + errorClass = "NOT_SUPPORTED_COMMAND_FOR_V2_TABLE", + messageParameters = Map("cmd" -> toSQLStmt(cmd))) } def analyzeTableNotSupportedForV2TablesError(): Throwable = { @@ -2402,12 +2419,14 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { "config" -> SQLConf.ALLOW_NON_EMPTY_LOCATION_IN_CTAS.key)) } - def unsetNonExistentPropertyError(property: String, table: TableIdentifier): Throwable = { + def unsetNonExistentPropertiesError( + properties: Seq[String], table: TableIdentifier): Throwable = { new AnalysisException( - errorClass = "_LEGACY_ERROR_TEMP_1244", + errorClass = "UNSET_NONEXISTENT_PROPERTIES", messageParameters = Map( - "property" -> property, - "table" -> table.toString)) + "properties" -> properties.map(toSQLId).mkString(", "), + "table" -> toSQLId(table.nameParts)) + ) } def alterTableChangeColumnNotSupportedForColumnTypeError( @@ -3232,12 +3251,6 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { messageParameters = Map("columnPath" -> toSQLId(path))) } - def notNullConstraintViolationStructFieldError(path: Seq[String]): Throwable = { - new AnalysisException( - errorClass = "NOT_NULL_CONSTRAINT_VIOLATION.STRUCT_FIELD", - messageParameters = Map("columnPath" -> toSQLId(path))) - } - def invalidColumnOrFieldDataTypeError( name: Seq[String], dt: DataType, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 5eae3259729..b2f5c66a35a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -333,10 +333,11 @@ case class AlterTableUnsetPropertiesCommand( val catalog = sparkSession.sessionState.catalog val table = catalog.getTableRawMetadata(tableName) if (!ifExists) { - propKeys.foreach { k => - if (!table.properties.contains(k) && k != TableCatalog.PROP_COMMENT) { - throw QueryCompilationErrors.unsetNonExistentPropertyError(k, table.identifier) - } + val nonexistentKeys = propKeys.filter(key => !table.properties.contains(key) + && key != TableCatalog.PROP_COMMENT) + if (nonexistentKeys.nonEmpty) { + throw QueryCompilationErrors.unsetNonExistentPropertiesError( + nonexistentKeys, table.identifier) } } // If comment is in the table property, we reset it to None diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala index 4d9e1674871..1bc2d43d7b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala @@ -66,7 +66,7 @@ class JDBCTableCatalog extends TableCatalog JdbcUtils.withConnection(options) { conn => val schemaPattern = if (namespace.length == 1) namespace.head else null val rs = conn.getMetaData - .getTables(null, schemaPattern, "%", Array("TABLE")); + .getTables(null, schemaPattern, "%", Array("TABLE")) new Iterator[Identifier] { def hasNext = rs.next() def next() = Identifier.of(namespace, rs.getString("TABLE_NAME")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 6f14b0971ca..f5ae5d499e1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -2316,6 +2316,20 @@ class DataSourceV2SQLSuiteV1Filter } } + test("ALTER TABLE ... SET [SERDE|SERDEPROPERTIES]") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo") + + testNotSupportedV2Command("ALTER TABLE", + s"$t SET SERDE 'test_serde'", + Some("ALTER TABLE ... SET [SERDE|SERDEPROPERTIES]")) + testNotSupportedV2Command("ALTER TABLE", + s"$t SET SERDEPROPERTIES ('a' = 'b')", + Some("ALTER TABLE ... SET [SERDE|SERDEPROPERTIES]")) + } + } + test("CREATE VIEW") { val v = "testcat.ns1.ns2.v" checkError( @@ -3258,13 +3272,17 @@ class DataSourceV2SQLSuiteV1Filter } } - private def testNotSupportedV2Command(sqlCommand: String, sqlParams: String): Unit = { + private def testNotSupportedV2Command( + sqlCommand: String, + sqlParams: String, + expectedArgument: Option[String] = None): Unit = { checkError( exception = intercept[AnalysisException] { sql(s"$sqlCommand $sqlParams") }, - errorClass = "_LEGACY_ERROR_TEMP_1124", - parameters = Map("cmd" -> sqlCommand)) + errorClass = "NOT_SUPPORTED_COMMAND_FOR_V2_TABLE", + sqlState = "46110", + parameters = Map("cmd" -> expectedArgument.getOrElse(sqlCommand))) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala index 35e4dc360bf..6329ff02373 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala @@ -21,11 +21,13 @@ import org.apache.spark.SPARK_DOC_ROOT import org.apache.spark.sql.{AnalysisException, ClassData, IntegratedUDFTestUtils, QueryTest, Row} import org.apache.spark.sql.api.java.{UDF1, UDF2, UDF23Test} import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog import org.apache.spark.sql.expressions.SparkUserDefinedFunction import org.apache.spark.sql.functions.{array, from_json, grouping, grouping_id, lit, struct, sum, udf} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{IntegerType, MapType, StringType, StructField, StructType} +import org.apache.spark.util.Utils case class StringLongClass(a: String, b: Long) @@ -37,6 +39,7 @@ case class ArrayClass(arr: Seq[StringIntClass]) class QueryCompilationErrorsSuite extends QueryTest + with QueryErrorsBase with SharedSparkSession { import testImplicits._ @@ -784,6 +787,78 @@ class QueryCompilationErrorsSuite sqlState = "42000", parameters = Map("extraction" -> "\"array(test)\"")) } + + test("CREATE NAMESPACE with LOCATION for JDBC catalog should throw an error") { + withTempDir { tempDir => + val url = s"jdbc:h2:${tempDir.getCanonicalPath};user=testUser;password=testPass" + Utils.classForName("org.h2.Driver") + withSQLConf( + "spark.sql.catalog.h2" -> classOf[JDBCTableCatalog].getName, + "spark.sql.catalog.h2.url" -> url, + "spark.sql.catalog.h2.driver" -> "org.h2.Driver") { + checkError( + exception = intercept[AnalysisException] { + sql("CREATE NAMESPACE h2.test_namespace LOCATION './samplepath'") + }, + errorClass = "NOT_SUPPORTED_IN_JDBC_CATALOG.COMMAND", + sqlState = "46110", + parameters = Map("cmd" -> toSQLStmt("CREATE NAMESPACE ... LOCATION ..."))) + } + } + } + + test("ALTER NAMESPACE with property other than COMMENT " + + "for JDBC catalog should throw an exception") { + withTempDir { tempDir => + val url = s"jdbc:h2:${tempDir.getCanonicalPath};user=testUser;password=testPass" + Utils.classForName("org.h2.Driver") + withSQLConf( + "spark.sql.catalog.h2" -> classOf[JDBCTableCatalog].getName, + "spark.sql.catalog.h2.url" -> url, + "spark.sql.catalog.h2.driver" -> "org.h2.Driver") { + val namespace = "h2.test_namespace" + withNamespace(namespace) { + sql(s"CREATE NAMESPACE $namespace") + checkError( + exception = intercept[AnalysisException] { + sql(s"ALTER NAMESPACE h2.test_namespace SET LOCATION '/tmp/loc_test_2'") + }, + errorClass = "NOT_SUPPORTED_IN_JDBC_CATALOG.COMMAND_WITH_PROPERTY", + sqlState = "46110", + parameters = Map( + "cmd" -> toSQLStmt("SET NAMESPACE"), + "property" -> toSQLConf("location"))) + + checkError( + exception = intercept[AnalysisException] { + sql(s"ALTER NAMESPACE h2.test_namespace SET PROPERTIES('a'='b')") + }, + errorClass = "NOT_SUPPORTED_IN_JDBC_CATALOG.COMMAND_WITH_PROPERTY", + sqlState = "46110", + parameters = Map( + "cmd" -> toSQLStmt("SET NAMESPACE"), + "property" -> toSQLConf("a"))) + } + } + } + } + + test("ALTER TABLE UNSET nonexistent property should throw an exception") { + val tableName = "test_table" + withTable(tableName) { + sql(s"CREATE TABLE $tableName (a STRING, b INT) USING parquet") + + checkError( + exception = intercept[AnalysisException] { + sql(s"ALTER TABLE $tableName UNSET TBLPROPERTIES ('test_prop1', 'test_prop2', 'comment')") + }, + errorClass = "UNSET_NONEXISTENT_PROPERTIES", + parameters = Map( + "properties" -> "`test_prop1`, `test_prop2`", + "table" -> "`spark_catalog`.`default`.`test_table`") + ) + } + } } class MyCastToString extends SparkUserDefinedFunction( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableSetSerdeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableSetSerdeSuite.scala index c824f1a6ded..57660d43c2c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableSetSerdeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableSetSerdeSuite.scala @@ -36,11 +36,14 @@ class AlterTableSetSerdeSuite extends command.AlterTableSetSerdeSuiteBase with C withTable(t) { spark.sql(s"CREATE TABLE $t (id bigint, data string) " + s"USING foo PARTITIONED BY (id)") - val e = intercept[AnalysisException] { - sql(s"ALTER TABLE $t SET SERDEPROPERTIES ('columns'='foo,bar', 'field.delim' = ',')") - } - assert(e.message.contains( - "ALTER TABLE ... SET [SERDE|SERDEPROPERTIES] is not supported for v2 tables")) + checkError( + exception = intercept[AnalysisException] { + sql(s"ALTER TABLE $t SET SERDEPROPERTIES ('columns'='foo,bar', 'field.delim' = ',')") + }, + errorClass = "NOT_SUPPORTED_COMMAND_FOR_V2_TABLE", + sqlState = "46110", + parameters = Map("cmd" -> "ALTER TABLE ... SET [SERDE|SERDEPROPERTIES]") + ) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index d1b94815c5d..706dfa82abb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME +import org.apache.spark.sql.errors.QueryErrorsBase import org.apache.spark.sql.execution.command.CreateTableCommand import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.hive.HiveExternalCatalog._ @@ -42,7 +43,10 @@ import org.apache.spark.util.Utils /** * Tests for persisting tables created though the data sources API into the metastore. */ -class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { +class MetastoreDataSourcesSuite extends QueryTest + with SQLTestUtils + with TestHiveSingleton + with QueryErrorsBase { import hiveContext._ import spark.implicits._ @@ -1335,8 +1339,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv test("read table with corrupted schema") { try { - val schema = StructType(StructField("int", IntegerType, true) :: Nil) - val hiveTable = CatalogTable( + val schema = StructType(StructField("int", IntegerType) :: Nil) + val hiveTableWithoutNumPartsProp = CatalogTable( identifier = TableIdentifier("t", Some("default")), tableType = CatalogTableType.MANAGED, schema = HiveExternalCatalog.EMPTY_DATA_SCHEMA, @@ -1344,23 +1348,52 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv storage = CatalogStorageFormat.empty, properties = Map( DATASOURCE_PROVIDER -> "json", - // no DATASOURCE_SCHEMA_NUMPARTS DATASOURCE_SCHEMA_PART_PREFIX + 0 -> schema.json)) - hiveClient.createTable(hiveTable, ignoreIfExists = false) + hiveClient.createTable(hiveTableWithoutNumPartsProp, ignoreIfExists = false) - val e = intercept[AnalysisException] { - sharedState.externalCatalog.getTable("default", "t") - }.getMessage - assert(e.contains("Cannot read table property 'spark.sql.sources.schema' as it's corrupted")) + checkError( + exception = intercept[AnalysisException] { + sharedState.externalCatalog.getTable("default", "t") + }, + errorClass = "INSUFFICIENT_TABLE_PROPERTY.MISSING_KEY", + parameters = Map("key" -> toSQLConf("spark.sql.sources.schema")) + ) + + val hiveTableWithNumPartsProp = CatalogTable( + identifier = TableIdentifier("t2", Some("default")), + tableType = CatalogTableType.MANAGED, + schema = HiveExternalCatalog.EMPTY_DATA_SCHEMA, + provider = Some("json"), + storage = CatalogStorageFormat.empty, + properties = Map( + DATASOURCE_PROVIDER -> "json", + DATASOURCE_SCHEMA_PREFIX + "numParts" -> "3", + DATASOURCE_SCHEMA_PART_PREFIX + 0 -> schema.json)) + + hiveClient.createTable(hiveTableWithNumPartsProp, ignoreIfExists = false) + + checkError( + exception = intercept[AnalysisException] { + sharedState.externalCatalog.getTable("default", "t2") + }, + errorClass = "INSUFFICIENT_TABLE_PROPERTY.MISSING_KEY_PART", + parameters = Map( + "key" -> toSQLConf("spark.sql.sources.schema.part.1"), + "totalAmountOfParts" -> "3") + ) withDebugMode { val tableMeta = sharedState.externalCatalog.getTable("default", "t") assert(tableMeta.identifier == TableIdentifier("t", Some("default"))) assert(tableMeta.properties(DATASOURCE_PROVIDER) == "json") + val tableMeta2 = sharedState.externalCatalog.getTable("default", "t2") + assert(tableMeta2.identifier == TableIdentifier("t2", Some("default"))) + assert(tableMeta2.properties(DATASOURCE_PROVIDER) == "json") } } finally { hiveClient.dropTable("default", "t", ignoreIfNotExists = true, purge = true) + hiveClient.dropTable("default", "t2", ignoreIfNotExists = true, purge = true) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 96408790259..f17ec922b9b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -770,8 +770,8 @@ class HiveDDLSuite assertAnalysisError( s"ALTER VIEW $viewName UNSET TBLPROPERTIES ('p')", - "Attempted to unset non-existent property 'p' in table " + - s"'`$SESSION_CATALOG_NAME`.`default`.`view1`'") + "Attempted to unset non-existent properties [`p`] in table " + + s"`$SESSION_CATALOG_NAME`.`default`.`view1`") } } } @@ -1748,7 +1748,9 @@ class HiveDDLSuite assertAnalysisError( s"ALTER TABLE tbl UNSET TBLPROPERTIES ('${forbiddenPrefix}foo')", - s"${forbiddenPrefix}foo") + s"${(forbiddenPrefix.split(".") :+ "foo") + .map(part => s"`$part`") + .mkString(".")}") assertAnalysisError( s"CREATE TABLE tbl2 (a INT) TBLPROPERTIES ('${forbiddenPrefix}foo'='anything')", --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org