This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new b3fd5c9  [SPARK-31184][SQL] Support getTablesByType API of Hive Client
b3fd5c9 is described below

commit b3fd5c90ae00807f3a49e2b9f1f0c8d026cc4219
Author: Eric Wu <492960...@qq.com>
AuthorDate: Sat Mar 21 17:41:23 2020 -0700

    [SPARK-31184][SQL] Support getTablesByType API of Hive Client
    
    ### What changes were proposed in this pull request?
    Hive 2.3+ supports `getTablesByType` API, which will provide an efficient 
way to get HiveTable with specific type. Now, we have following mappings when 
using `HiveExternalCatalog`.
    ```
    CatalogTableType.EXTERNAL  =>  HiveTableType.EXTERNAL_TABLE
    CatalogTableType.MANAGED => HiveTableType.MANAGED_TABLE
    CatalogTableType.VIEW => HiveTableType.VIRTUAL_VIEW
    ```
    Without this API, we need to achieve the goal by `getTables` + 
`getTablesByName` + `filter with type`.
    
    This PR add `getTablesByType` in `HiveShim`. For those hive versions don't 
support this API, `UnsupportedOperationException` will be thrown. And the upper 
logic should catch the exception and fallback to the filter solution mentioned 
above.
    
    Since the JDK11 related fix in `Hive` is not released yet, manual tests 
against hive 2.3.7-SNAPSHOT is done by following the instructions of 
SPARK-29245.
    
    ### Why are the changes needed?
    This API will provide better usability and performance if we want to get a 
list of hiveTables with specific type. For example `HiveTableType.VIRTUAL_VIEW` 
corresponding to `CatalogTableType.VIEW`.
    
    ### Does this PR introduce any user-facing change?
    No, this is a support function.
    
    ### How was this patch tested?
    Add tests in VersionsSuite and manually run JDK11 test with following 
settings:
    
    - Hive 2.3.6 Metastore on JDK8
    - Hive 2.3.7-SNAPSHOT library build from source of Hive 2.3 branch
    - Spark build with Hive 2.3.7-SNAPSHOT on jdk-11.0.6
    
    Closes #27952 from Eric5553/GetTableByType.
    
    Authored-by: Eric Wu <492960...@qq.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
    (cherry picked from commit 3a48ea1fe0fb85253f12d86caea01ffcb7e678d0)
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../apache/spark/sql/hive/client/HiveClient.scala  |  9 +++++
 .../spark/sql/hive/client/HiveClientImpl.scala     | 42 ++++++++++++++++------
 .../apache/spark/sql/hive/client/HiveShim.scala    | 35 +++++++++++++++++-
 .../spark/sql/hive/client/VersionsSuite.scala      | 22 +++++++++---
 4 files changed, 92 insertions(+), 16 deletions(-)

diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index e31dffa..3ea80ea 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -61,6 +61,15 @@ private[hive] trait HiveClient {
   /** Returns the names of tables in the given database that matches the given 
pattern. */
   def listTables(dbName: String, pattern: String): Seq[String]
 
+  /**
+   * Returns the names of tables with specific tableType in the given database 
that matches
+   * the given pattern.
+   */
+  def listTablesByType(
+      dbName: String,
+      pattern: String,
+      tableType: CatalogTableType): Seq[String]
+
   /** Sets the name of current database. */
   def setCurrentDatabase(databaseName: String): Unit
 
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 4a3e813..6ad5e9d 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -755,6 +755,22 @@ private[hive] class HiveClientImpl(
     client.getTablesByPattern(dbName, pattern).asScala
   }
 
+  override def listTablesByType(
+      dbName: String,
+      pattern: String,
+      tableType: CatalogTableType): Seq[String] = withHiveState {
+    try {
+      // Try with Hive API getTablesByType first, it's supported from Hive 
2.3+.
+      shim.getTablesByType(client, dbName, pattern, toHiveTableType(tableType))
+    } catch {
+      case _: UnsupportedOperationException =>
+        // Fallback to filter logic if getTablesByType not supported.
+        val tableNames = client.getTablesByPattern(dbName, pattern).asScala
+        val tables = getTablesByName(dbName, tableNames).filter(_.tableType == 
tableType)
+        tables.map(_.identifier.table)
+    }
+  }
+
   /**
    * Runs the specified SQL query using Hive.
    */
@@ -1011,25 +1027,29 @@ private[hive] object HiveClientImpl extends Logging {
   private def toOutputFormat(name: String) =
     Utils.classForName[org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, 
_]](name)
 
+  def toHiveTableType(catalogTableType: CatalogTableType): HiveTableType = {
+    catalogTableType match {
+      case CatalogTableType.EXTERNAL => HiveTableType.EXTERNAL_TABLE
+      case CatalogTableType.MANAGED => HiveTableType.MANAGED_TABLE
+      case CatalogTableType.VIEW => HiveTableType.VIRTUAL_VIEW
+      case t =>
+        throw new IllegalArgumentException(
+          s"Unknown table type is found at toHiveTableType: $t")
+    }
+  }
+
   /**
    * Converts the native table metadata representation format CatalogTable to 
Hive's Table.
    */
   def toHiveTable(table: CatalogTable, userName: Option[String] = None): 
HiveTable = {
     val hiveTable = new HiveTable(table.database, table.identifier.table)
+    hiveTable.setTableType(toHiveTableType(table.tableType))
     // For EXTERNAL_TABLE, we also need to set EXTERNAL field in the table 
properties.
     // Otherwise, Hive metastore will change the table to a MANAGED_TABLE.
     // 
(metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java#L1095-L1105)
-    hiveTable.setTableType(table.tableType match {
-      case CatalogTableType.EXTERNAL =>
-        hiveTable.setProperty("EXTERNAL", "TRUE")
-        HiveTableType.EXTERNAL_TABLE
-      case CatalogTableType.MANAGED =>
-        HiveTableType.MANAGED_TABLE
-      case CatalogTableType.VIEW => HiveTableType.VIRTUAL_VIEW
-      case t =>
-        throw new IllegalArgumentException(
-          s"Unknown table type is found at toHiveTable: $t")
-    })
+    if (table.tableType == CatalogTableType.EXTERNAL) {
+      hiveTable.setProperty("EXTERNAL", "TRUE")
+    }
     // Note: In Hive the schema and partition columns must be disjoint sets
     val (partCols, schema) = table.schema.map(toHiveColumn).partition { c =>
       table.partitionColumnNames.contains(c.getName)
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 50ce536..2b80660 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -29,6 +29,7 @@ import scala.util.control.NonFatal
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.metastore.IMetaStoreClient
+import org.apache.hadoop.hive.metastore.TableType
 import org.apache.hadoop.hive.metastore.api.{Database, EnvironmentContext, 
Function => HiveFunction, FunctionType, MetaException, PrincipalType, 
ResourceType, ResourceUri}
 import org.apache.hadoop.hive.ql.Driver
 import org.apache.hadoop.hive.ql.io.AcidUtils
@@ -90,6 +91,12 @@ private[client] sealed abstract class Shim {
 
   def alterPartitions(hive: Hive, tableName: String, newParts: 
JList[Partition]): Unit
 
+  def getTablesByType(
+      hive: Hive,
+      dbName: String,
+      pattern: String,
+      tableType: TableType): Seq[String]
+
   def createPartitions(
       hive: Hive,
       db: String,
@@ -363,6 +370,15 @@ private[client] class Shim_v0_12 extends Shim with Logging 
{
     conf.getIntVar(HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY) * 
1000L
   }
 
+  override def getTablesByType(
+      hive: Hive,
+      dbName: String,
+      pattern: String,
+      tableType: TableType): Seq[String] = {
+    throw new UnsupportedOperationException("Hive 2.2 and lower versions don't 
support " +
+      "getTablesByType. Please use Hive 2.3 or higher version.")
+  }
+
   override def loadPartition(
       hive: Hive,
       loadPath: Path,
@@ -1220,7 +1236,24 @@ private[client] class Shim_v2_1 extends Shim_v2_0 {
 
 private[client] class Shim_v2_2 extends Shim_v2_1
 
-private[client] class Shim_v2_3 extends Shim_v2_1
+private[client] class Shim_v2_3 extends Shim_v2_1 {
+  private lazy val getTablesByTypeMethod =
+    findMethod(
+      classOf[Hive],
+      "getTablesByType",
+      classOf[String],
+      classOf[String],
+      classOf[TableType])
+
+  override def getTablesByType(
+      hive: Hive,
+      dbName: String,
+      pattern: String,
+      tableType: TableType): Seq[String] = {
+    getTablesByTypeMethod.invoke(hive, dbName, pattern, tableType)
+      .asInstanceOf[JList[String]].asScala
+  }
+}
 
 private[client] class Shim_v3_0 extends Shim_v2_3 {
   // Spark supports only non-ACID operations
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 7471142..ba75dcf 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -154,10 +154,11 @@ class VersionsSuite extends SparkFunSuite with Logging {
         .client.version.fullVersion.startsWith(version))
     }
 
-    def table(database: String, tableName: String): CatalogTable = {
+    def table(database: String, tableName: String,
+        tableType: CatalogTableType = CatalogTableType.MANAGED): CatalogTable 
= {
       CatalogTable(
         identifier = TableIdentifier(tableName, Some(database)),
-        tableType = CatalogTableType.MANAGED,
+        tableType = tableType,
         schema = new StructType().add("key", "int"),
         storage = CatalogStorageFormat(
           locationUri = None,
@@ -273,7 +274,9 @@ class VersionsSuite extends SparkFunSuite with Logging {
 
     test(s"$version: createTable") {
       client.createTable(table("default", tableName = "src"), ignoreIfExists = 
false)
-      client.createTable(table("default", "temporary"), ignoreIfExists = false)
+      client.createTable(table("default", tableName = "temporary"), 
ignoreIfExists = false)
+      client.createTable(table("default", tableName = "view1", tableType = 
CatalogTableType.VIEW),
+        ignoreIfExists = false)
     }
 
     test(s"$version: loadTable") {
@@ -389,7 +392,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
     }
 
     test(s"$version: listTables(database)") {
-      assert(client.listTables("default") === Seq("src", "temporary"))
+      assert(client.listTables("default") === Seq("src", "temporary", "view1"))
     }
 
     test(s"$version: listTables(database, pattern)") {
@@ -397,6 +400,13 @@ class VersionsSuite extends SparkFunSuite with Logging {
       assert(client.listTables("default", pattern = "nonexist").isEmpty)
     }
 
+    test(s"$version: listTablesByType(database, pattern, tableType)") {
+      assert(client.listTablesByType("default", pattern = "view1",
+        CatalogTableType.VIEW) === Seq("view1"))
+      assert(client.listTablesByType("default", pattern = "nonexist",
+        CatalogTableType.VIEW).isEmpty)
+    }
+
     test(s"$version: dropTable") {
       val versionsWithoutPurge =
         if (versions.contains("0.14")) versions.takeWhile(_ != "0.14") else Nil
@@ -405,12 +415,16 @@ class VersionsSuite extends SparkFunSuite with Logging {
       try {
         client.dropTable("default", tableName = "temporary", ignoreIfNotExists 
= false,
           purge = true)
+        client.dropTable("default", tableName = "view1", ignoreIfNotExists = 
false,
+          purge = true)
         assert(!versionsWithoutPurge.contains(version))
       } catch {
         case _: UnsupportedOperationException =>
           assert(versionsWithoutPurge.contains(version))
           client.dropTable("default", tableName = "temporary", 
ignoreIfNotExists = false,
             purge = false)
+          client.dropTable("default", tableName = "view1", ignoreIfNotExists = 
false,
+            purge = false)
       }
       assert(client.listTables("default") === Seq("src"))
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to