This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new b3fd5c9 [SPARK-31184][SQL] Support getTablesByType API of Hive Client b3fd5c9 is described below commit b3fd5c90ae00807f3a49e2b9f1f0c8d026cc4219 Author: Eric Wu <492960...@qq.com> AuthorDate: Sat Mar 21 17:41:23 2020 -0700 [SPARK-31184][SQL] Support getTablesByType API of Hive Client ### What changes were proposed in this pull request? Hive 2.3+ supports `getTablesByType` API, which will provide an efficient way to get HiveTable with specific type. Now, we have following mappings when using `HiveExternalCatalog`. ``` CatalogTableType.EXTERNAL => HiveTableType.EXTERNAL_TABLE CatalogTableType.MANAGED => HiveTableType.MANAGED_TABLE CatalogTableType.VIEW => HiveTableType.VIRTUAL_VIEW ``` Without this API, we need to achieve the goal by `getTables` + `getTablesByName` + `filter with type`. This PR add `getTablesByType` in `HiveShim`. For those hive versions don't support this API, `UnsupportedOperationException` will be thrown. And the upper logic should catch the exception and fallback to the filter solution mentioned above. Since the JDK11 related fix in `Hive` is not released yet, manual tests against hive 2.3.7-SNAPSHOT is done by following the instructions of SPARK-29245. ### Why are the changes needed? This API will provide better usability and performance if we want to get a list of hiveTables with specific type. For example `HiveTableType.VIRTUAL_VIEW` corresponding to `CatalogTableType.VIEW`. ### Does this PR introduce any user-facing change? No, this is a support function. ### How was this patch tested? Add tests in VersionsSuite and manually run JDK11 test with following settings: - Hive 2.3.6 Metastore on JDK8 - Hive 2.3.7-SNAPSHOT library build from source of Hive 2.3 branch - Spark build with Hive 2.3.7-SNAPSHOT on jdk-11.0.6 Closes #27952 from Eric5553/GetTableByType. Authored-by: Eric Wu <492960...@qq.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> (cherry picked from commit 3a48ea1fe0fb85253f12d86caea01ffcb7e678d0) Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../apache/spark/sql/hive/client/HiveClient.scala | 9 +++++ .../spark/sql/hive/client/HiveClientImpl.scala | 42 ++++++++++++++++------ .../apache/spark/sql/hive/client/HiveShim.scala | 35 +++++++++++++++++- .../spark/sql/hive/client/VersionsSuite.scala | 22 +++++++++--- 4 files changed, 92 insertions(+), 16 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index e31dffa..3ea80ea 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -61,6 +61,15 @@ private[hive] trait HiveClient { /** Returns the names of tables in the given database that matches the given pattern. */ def listTables(dbName: String, pattern: String): Seq[String] + /** + * Returns the names of tables with specific tableType in the given database that matches + * the given pattern. + */ + def listTablesByType( + dbName: String, + pattern: String, + tableType: CatalogTableType): Seq[String] + /** Sets the name of current database. */ def setCurrentDatabase(databaseName: String): Unit diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 4a3e813..6ad5e9d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -755,6 +755,22 @@ private[hive] class HiveClientImpl( client.getTablesByPattern(dbName, pattern).asScala } + override def listTablesByType( + dbName: String, + pattern: String, + tableType: CatalogTableType): Seq[String] = withHiveState { + try { + // Try with Hive API getTablesByType first, it's supported from Hive 2.3+. + shim.getTablesByType(client, dbName, pattern, toHiveTableType(tableType)) + } catch { + case _: UnsupportedOperationException => + // Fallback to filter logic if getTablesByType not supported. + val tableNames = client.getTablesByPattern(dbName, pattern).asScala + val tables = getTablesByName(dbName, tableNames).filter(_.tableType == tableType) + tables.map(_.identifier.table) + } + } + /** * Runs the specified SQL query using Hive. */ @@ -1011,25 +1027,29 @@ private[hive] object HiveClientImpl extends Logging { private def toOutputFormat(name: String) = Utils.classForName[org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]](name) + def toHiveTableType(catalogTableType: CatalogTableType): HiveTableType = { + catalogTableType match { + case CatalogTableType.EXTERNAL => HiveTableType.EXTERNAL_TABLE + case CatalogTableType.MANAGED => HiveTableType.MANAGED_TABLE + case CatalogTableType.VIEW => HiveTableType.VIRTUAL_VIEW + case t => + throw new IllegalArgumentException( + s"Unknown table type is found at toHiveTableType: $t") + } + } + /** * Converts the native table metadata representation format CatalogTable to Hive's Table. */ def toHiveTable(table: CatalogTable, userName: Option[String] = None): HiveTable = { val hiveTable = new HiveTable(table.database, table.identifier.table) + hiveTable.setTableType(toHiveTableType(table.tableType)) // For EXTERNAL_TABLE, we also need to set EXTERNAL field in the table properties. // Otherwise, Hive metastore will change the table to a MANAGED_TABLE. // (metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java#L1095-L1105) - hiveTable.setTableType(table.tableType match { - case CatalogTableType.EXTERNAL => - hiveTable.setProperty("EXTERNAL", "TRUE") - HiveTableType.EXTERNAL_TABLE - case CatalogTableType.MANAGED => - HiveTableType.MANAGED_TABLE - case CatalogTableType.VIEW => HiveTableType.VIRTUAL_VIEW - case t => - throw new IllegalArgumentException( - s"Unknown table type is found at toHiveTable: $t") - }) + if (table.tableType == CatalogTableType.EXTERNAL) { + hiveTable.setProperty("EXTERNAL", "TRUE") + } // Note: In Hive the schema and partition columns must be disjoint sets val (partCols, schema) = table.schema.map(toHiveColumn).partition { c => table.partitionColumnNames.contains(c.getName) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 50ce536..2b80660 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -29,6 +29,7 @@ import scala.util.control.NonFatal import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.IMetaStoreClient +import org.apache.hadoop.hive.metastore.TableType import org.apache.hadoop.hive.metastore.api.{Database, EnvironmentContext, Function => HiveFunction, FunctionType, MetaException, PrincipalType, ResourceType, ResourceUri} import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.io.AcidUtils @@ -90,6 +91,12 @@ private[client] sealed abstract class Shim { def alterPartitions(hive: Hive, tableName: String, newParts: JList[Partition]): Unit + def getTablesByType( + hive: Hive, + dbName: String, + pattern: String, + tableType: TableType): Seq[String] + def createPartitions( hive: Hive, db: String, @@ -363,6 +370,15 @@ private[client] class Shim_v0_12 extends Shim with Logging { conf.getIntVar(HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY) * 1000L } + override def getTablesByType( + hive: Hive, + dbName: String, + pattern: String, + tableType: TableType): Seq[String] = { + throw new UnsupportedOperationException("Hive 2.2 and lower versions don't support " + + "getTablesByType. Please use Hive 2.3 or higher version.") + } + override def loadPartition( hive: Hive, loadPath: Path, @@ -1220,7 +1236,24 @@ private[client] class Shim_v2_1 extends Shim_v2_0 { private[client] class Shim_v2_2 extends Shim_v2_1 -private[client] class Shim_v2_3 extends Shim_v2_1 +private[client] class Shim_v2_3 extends Shim_v2_1 { + private lazy val getTablesByTypeMethod = + findMethod( + classOf[Hive], + "getTablesByType", + classOf[String], + classOf[String], + classOf[TableType]) + + override def getTablesByType( + hive: Hive, + dbName: String, + pattern: String, + tableType: TableType): Seq[String] = { + getTablesByTypeMethod.invoke(hive, dbName, pattern, tableType) + .asInstanceOf[JList[String]].asScala + } +} private[client] class Shim_v3_0 extends Shim_v2_3 { // Spark supports only non-ACID operations diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 7471142..ba75dcf 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -154,10 +154,11 @@ class VersionsSuite extends SparkFunSuite with Logging { .client.version.fullVersion.startsWith(version)) } - def table(database: String, tableName: String): CatalogTable = { + def table(database: String, tableName: String, + tableType: CatalogTableType = CatalogTableType.MANAGED): CatalogTable = { CatalogTable( identifier = TableIdentifier(tableName, Some(database)), - tableType = CatalogTableType.MANAGED, + tableType = tableType, schema = new StructType().add("key", "int"), storage = CatalogStorageFormat( locationUri = None, @@ -273,7 +274,9 @@ class VersionsSuite extends SparkFunSuite with Logging { test(s"$version: createTable") { client.createTable(table("default", tableName = "src"), ignoreIfExists = false) - client.createTable(table("default", "temporary"), ignoreIfExists = false) + client.createTable(table("default", tableName = "temporary"), ignoreIfExists = false) + client.createTable(table("default", tableName = "view1", tableType = CatalogTableType.VIEW), + ignoreIfExists = false) } test(s"$version: loadTable") { @@ -389,7 +392,7 @@ class VersionsSuite extends SparkFunSuite with Logging { } test(s"$version: listTables(database)") { - assert(client.listTables("default") === Seq("src", "temporary")) + assert(client.listTables("default") === Seq("src", "temporary", "view1")) } test(s"$version: listTables(database, pattern)") { @@ -397,6 +400,13 @@ class VersionsSuite extends SparkFunSuite with Logging { assert(client.listTables("default", pattern = "nonexist").isEmpty) } + test(s"$version: listTablesByType(database, pattern, tableType)") { + assert(client.listTablesByType("default", pattern = "view1", + CatalogTableType.VIEW) === Seq("view1")) + assert(client.listTablesByType("default", pattern = "nonexist", + CatalogTableType.VIEW).isEmpty) + } + test(s"$version: dropTable") { val versionsWithoutPurge = if (versions.contains("0.14")) versions.takeWhile(_ != "0.14") else Nil @@ -405,12 +415,16 @@ class VersionsSuite extends SparkFunSuite with Logging { try { client.dropTable("default", tableName = "temporary", ignoreIfNotExists = false, purge = true) + client.dropTable("default", tableName = "view1", ignoreIfNotExists = false, + purge = true) assert(!versionsWithoutPurge.contains(version)) } catch { case _: UnsupportedOperationException => assert(versionsWithoutPurge.contains(version)) client.dropTable("default", tableName = "temporary", ignoreIfNotExists = false, purge = false) + client.dropTable("default", tableName = "view1", ignoreIfNotExists = false, + purge = false) } assert(client.listTables("default") === Seq("src")) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org