This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ebb4858 [SPARK-35058][SQL] Group exception messages in hive/client ebb4858 is described below commit ebb4858f7185c6525adc4b23bc89f0a8262bf940 Author: beliefer <belie...@163.com> AuthorDate: Wed Jun 9 08:23:09 2021 +0000 [SPARK-35058][SQL] Group exception messages in hive/client ### What changes were proposed in this pull request? This PR group exception messages in `sql/hive/src/main/scala/org/apache/spark/sql/hive/client`. ### Why are the changes needed? It will largely help with standardization of error messages and its maintenance. ### Does this PR introduce _any_ user-facing change? No. Error messages remain unchanged. ### How was this patch tested? No new tests - pass all original tests to make sure it doesn't break any existing behavior. Closes #32763 from beliefer/SPARK-35058. Lead-authored-by: beliefer <belie...@163.com> Co-authored-by: gengjiaan <gengji...@360.cn> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/errors/QueryCompilationErrors.scala | 17 +++++ .../spark/sql/errors/QueryExecutionErrors.scala | 75 +++++++++++++++++++++- .../spark/sql/hive/client/HiveClientImpl.scala | 19 +++--- .../apache/spark/sql/hive/client/HiveShim.scala | 27 +++----- .../sql/hive/client/IsolatedClientLoader.scala | 11 ++-- 5 files changed, 114 insertions(+), 35 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index ab8d146..040e134 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -1601,4 +1601,21 @@ private[spark] object QueryCompilationErrors { tableIdentifier: TableIdentifier): Throwable = { new AnalysisException(s"$tableIdentifier should be converted to HadoopFsRelation.") } + + def alterDatabaseLocationUnsupportedError(version: String): Throwable = { + new AnalysisException(s"Hive $version does not support altering database location") + } + + def hiveTableTypeUnsupportedError(tableType: String): Throwable = { + new AnalysisException(s"Hive $tableType is not supported.") + } + + def hiveCreatePermanentFunctionsUnsupportedError(): Throwable = { + new AnalysisException("Hive 0.12 doesn't support creating permanent functions. " + + "Please use Hive 0.13 or higher.") + } + + def unknownHiveResourceTypeError(resourceType: String): Throwable = { + new AnalysisException(s"Unknown resource type: $resourceType") + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index efb333c..c57e6e0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -18,7 +18,8 @@ package org.apache.spark.sql.errors import java.io.{FileNotFoundException, IOException} -import java.net.URISyntaxException +import java.lang.reflect.InvocationTargetException +import java.net.{URISyntaxException, URL} import java.sql.{SQLException, SQLFeatureNotSupportedException} import java.time.{DateTimeException, LocalDate} import java.time.temporal.ChronoField @@ -36,6 +37,7 @@ import org.apache.spark.sql.catalyst.WalkedTypePath import org.apache.spark.sql.catalyst.analysis.UnresolvedGenerator import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, UnevaluableAggregate} +import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.{DomainJoin, LogicalPlan} import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.ValueInterval @@ -1256,4 +1258,75 @@ object QueryExecutionErrors { def cannotCreateStagingDirError(message: String, e: IOException): Throwable = { new RuntimeException(s"Cannot create staging directory: $message", e) } + + def serDeInterfaceNotFoundError(e: NoClassDefFoundError): Throwable = { + new ClassNotFoundException("The SerDe interface removed since Hive 2.3(HIVE-15167)." + + " Please migrate your custom SerDes to Hive 2.3. See HIVE-15167 for more details.", e) + } + + def convertHiveTableToCatalogTableError( + e: SparkException, dbName: String, tableName: String): Throwable = { + new SparkException(s"${e.getMessage}, db: $dbName, table: $tableName", e) + } + + def cannotRecognizeHiveTypeError( + e: ParseException, fieldType: String, fieldName: String): Throwable = { + new SparkException( + s"Cannot recognize hive type string: $fieldType, column: $fieldName", e) + } + + def getTablesByTypeUnsupportedByHiveVersionError(): Throwable = { + new UnsupportedOperationException("Hive 2.2 and lower versions don't support " + + "getTablesByType. Please use Hive 2.3 or higher version.") + } + + def dropTableWithPurgeUnsupportedError(): Throwable = { + new UnsupportedOperationException("DROP TABLE ... PURGE") + } + + def alterTableWithDropPartitionAndPurgeUnsupportedError(): Throwable = { + new UnsupportedOperationException("ALTER TABLE ... DROP PARTITION ... PURGE") + } + + def invalidPartitionFilterError(): Throwable = { + new UnsupportedOperationException( + """Partition filter cannot have both `"` and `'` characters""") + } + + def getPartitionMetadataByFilterError(e: InvocationTargetException): Throwable = { + new RuntimeException( + s""" + |Caught Hive MetaException attempting to get partition metadata by filter + |from Hive. You can set the Spark configuration setting + |${SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key} to false to work around + |this problem, however this will result in degraded performance. Please + |report a bug: https://issues.apache.org/jira/browse/SPARK + """.stripMargin.replaceAll("\n", " "), e) + } + + def unsupportedHiveMetastoreVersionError(version: String, key: String): Throwable = { + new UnsupportedOperationException(s"Unsupported Hive Metastore version ($version). " + + s"Please set $key with a valid version.") + } + + def loadHiveClientCausesNoClassDefFoundError( + cnf: NoClassDefFoundError, + execJars: Seq[URL], + key: String, + e: InvocationTargetException): Throwable = { + new ClassNotFoundException( + s""" + |$cnf when creating Hive client using classpath: ${execJars.mkString(", ")}\n + |Please make sure that jars for your version of hive and hadoop are included in the + |paths passed to $key. + """.stripMargin.replaceAll("\n", " "), e) + } + + def cannotFetchTablesOfDatabaseError(dbName: String, e: Exception): Throwable = { + new SparkException(s"Unable to fetch tables of db $dbName", e) + } + + def illegalLocationClauseForViewPartitionError(): Throwable = { + new SparkException("LOCATION clause illegal for view partition") + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index bdf4905..6a92fb0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -47,7 +47,6 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.metrics.source.HiveCatalogMetrics -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException, NoSuchPartitionsException, PartitionAlreadyExistsException, PartitionsAlreadyExistException} import org.apache.spark.sql.catalyst.catalog._ @@ -56,6 +55,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog.SupportsNamespaces._ +import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.HiveExternalCatalog.DATASOURCE_SCHEMA @@ -312,8 +312,7 @@ private[hive] class HiveClientImpl( f } catch { case e: NoClassDefFoundError if e.getMessage.contains("apache/hadoop/hive/serde2/SerDe") => - throw new ClassNotFoundException("The SerDe interface removed since Hive 2.3(HIVE-15167)." + - " Please migrate your custom SerDes to Hive 2.3. See HIVE-15167 for more details.", e) + throw QueryExecutionErrors.serDeInterfaceNotFoundError(e) } finally { state.getConf.setClassLoader(originalConfLoader) Thread.currentThread().setContextClassLoader(original) @@ -366,8 +365,7 @@ private[hive] class HiveClientImpl( if (!getDatabase(database.name).locationUri.equals(database.locationUri)) { // SPARK-29260: Enable supported versions once it support altering database location. if (!(version.equals(hive.v3_0) || version.equals(hive.v3_1))) { - throw new AnalysisException( - s"Hive ${version.fullVersion} does not support altering database location") + throw QueryCompilationErrors.alterDatabaseLocationUnsupportedError(version.fullVersion) } } val hiveDb = toHiveDatabase(database) @@ -419,7 +417,7 @@ private[hive] class HiveClientImpl( .map(extraFixesForNonView).map(new HiveTable(_)).toSeq } catch { case ex: Exception => - throw new HiveException(s"Unable to fetch tables of db $dbName", ex); + throw QueryExecutionErrors.cannotFetchTablesOfDatabaseError(dbName, ex) } } @@ -447,8 +445,8 @@ private[hive] class HiveClientImpl( (h.getCols.asScala.map(fromHiveColumn), h.getPartCols.asScala.map(fromHiveColumn)) } catch { case ex: SparkException => - throw new SparkException( - s"${ex.getMessage}, db: ${h.getDbName}, table: ${h.getTableName}", ex) + throw QueryExecutionErrors.convertHiveTableToCatalogTableError( + ex, h.getDbName, h.getTableName) } val schema = StructType((cols ++ partCols).toSeq) @@ -515,7 +513,7 @@ private[hive] class HiveClientImpl( case HiveTableType.VIRTUAL_VIEW => CatalogTableType.VIEW case unsupportedType => val tableTypeStr = unsupportedType.toString.toLowerCase(Locale.ROOT).replace("_", " ") - throw new AnalysisException(s"Hive $tableTypeStr is not supported.") + throw QueryCompilationErrors.hiveTableTypeUnsupportedError(tableTypeStr) }, schema = schema, partitionColumnNames = partCols.map(_.name).toSeq, @@ -1017,8 +1015,7 @@ private[hive] object HiveClientImpl extends Logging { CatalystSqlParser.parseDataType(hc.getType) } catch { case e: ParseException => - throw new SparkException( - s"Cannot recognize hive type string: ${hc.getType}, column: ${hc.getName}", e) + throw QueryExecutionErrors.cannotRecognizeHiveTypeError(e, hc.getType, hc.getName) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 0a5b514..0a22fde 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -33,19 +33,19 @@ import org.apache.hadoop.hive.metastore.TableType import org.apache.hadoop.hive.metastore.api.{Database, EnvironmentContext, Function => HiveFunction, FunctionType, MetaException, PrincipalType, ResourceType, ResourceUri} import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.io.AcidUtils -import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException, Partition, Table} +import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table} import org.apache.hadoop.hive.ql.plan.AddPartitionDesc import org.apache.hadoop.hive.ql.processors.{CommandProcessor, CommandProcessorFactory} import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.serde.serdeConstants import org.apache.spark.internal.Logging -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTablePartition, CatalogUtils, FunctionResource, FunctionResourceType} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.{DateFormatter, TypeUtils} +import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{AtomicType, DateType, IntegralType, StringType} import org.apache.spark.unsafe.types.UTF8String @@ -325,7 +325,7 @@ private[client] class Shim_v0_12 extends Shim with Logging { // Ignore this partition since it already exists and ignoreIfExists == true } else { if (location == null && table.isView()) { - throw new HiveException("LOCATION clause illegal for view partition"); + throw QueryExecutionErrors.illegalLocationClauseForViewPartitionError() } createPartitionMethod.invoke( @@ -378,8 +378,7 @@ private[client] class Shim_v0_12 extends Shim with Logging { dbName: String, pattern: String, tableType: TableType): Seq[String] = { - throw new UnsupportedOperationException("Hive 2.2 and lower versions don't support " + - "getTablesByType. Please use Hive 2.3 or higher version.") + throw QueryExecutionErrors.getTablesByTypeUnsupportedByHiveVersionError() } override def loadPartition( @@ -428,7 +427,7 @@ private[client] class Shim_v0_12 extends Shim with Logging { ignoreIfNotExists: Boolean, purge: Boolean): Unit = { if (purge) { - throw new UnsupportedOperationException("DROP TABLE ... PURGE") + throw QueryExecutionErrors.dropTableWithPurgeUnsupportedError() } hive.dropTable(dbName, tableName, deleteData, ignoreIfNotExists) } @@ -449,14 +448,13 @@ private[client] class Shim_v0_12 extends Shim with Logging { deleteData: Boolean, purge: Boolean): Unit = { if (purge) { - throw new UnsupportedOperationException("ALTER TABLE ... DROP PARTITION ... PURGE") + throw QueryExecutionErrors.alterTableWithDropPartitionAndPurgeUnsupportedError() } hive.dropPartition(dbName, tableName, part, deleteData) } override def createFunction(hive: Hive, db: String, func: CatalogFunction): Unit = { - throw new AnalysisException("Hive 0.12 doesn't support creating permanent functions. " + - "Please use Hive 0.13 or higher.") + throw QueryCompilationErrors.hiveCreatePermanentFunctionsUnsupportedError() } def dropFunction(hive: Hive, db: String, name: String): Unit = { @@ -599,7 +597,7 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { case ResourceType.ARCHIVE => "archive" case ResourceType.FILE => "file" case ResourceType.JAR => "jar" - case r => throw new AnalysisException(s"Unknown resource type: $r") + case r => throw QueryCompilationErrors.unknownHiveResourceTypeError(r.toString) } FunctionResource(FunctionResourceType.fromString(resourceType), uri.getUri()) } @@ -859,8 +857,7 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { } else if (!str.contains("'")) { s"""'$str'""" } else { - throw new UnsupportedOperationException( - """Partition filter cannot have both `"` and `'` characters""") + throw QueryExecutionErrors.invalidPartitionFilterError() } } @@ -901,11 +898,7 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] && tryDirectSql => - throw new RuntimeException("Caught Hive MetaException attempting to get partition " + - "metadata by filter from Hive. You can set the Spark configuration setting " + - s"${SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key} to false to work around this " + - "problem, however this will result in degraded performance. Please report a bug: " + - "https://issues.apache.org/jira/browse/SPARK", ex) + throw QueryExecutionErrors.getPartitionMetadataByFilterError(ex) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index 6171d32..40ef10b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -34,6 +34,7 @@ import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkSubmitUtils import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.util.quietly +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.internal.NonClosableMutableURLClassLoader import org.apache.spark.sql.internal.SQLConf @@ -104,8 +105,8 @@ private[hive] object IsolatedClientLoader extends Logging { case (3, 1, _) => Some(hive.v3_1) case _ => None }.getOrElse { - throw new UnsupportedOperationException(s"Unsupported Hive Metastore version ($version). " + - s"Please set ${HiveUtils.HIVE_METASTORE_VERSION.key} with a valid version.") + throw QueryExecutionErrors.unsupportedHiveMetastoreVersionError( + version, HiveUtils.HIVE_METASTORE_VERSION.key) } } @@ -311,10 +312,8 @@ private[hive] class IsolatedClientLoader( case e: InvocationTargetException => if (e.getCause().isInstanceOf[NoClassDefFoundError]) { val cnf = e.getCause().asInstanceOf[NoClassDefFoundError] - throw new ClassNotFoundException( - s"$cnf when creating Hive client using classpath: ${execJars.mkString(", ")}\n" + - "Please make sure that jars for your version of hive and hadoop are included in the " + - s"paths passed to ${HiveUtils.HIVE_METASTORE_JARS.key}.", e) + throw QueryExecutionErrors.loadHiveClientCausesNoClassDefFoundError( + cnf, execJars, HiveUtils.HIVE_METASTORE_JARS.key, e) } else { throw e } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org