This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6278becfbed [SPARK-39759][SQL] Implement listIndexes in JDBC (H2 
dialect)
6278becfbed is described below

commit 6278becfbed412bad3d00f2b7989fd19a3a0ff07
Author: panbingkun <pbk1...@gmail.com>
AuthorDate: Mon Jul 18 23:34:28 2022 -0700

    [SPARK-39759][SQL] Implement listIndexes in JDBC (H2 dialect)
    
    ### What changes were proposed in this pull request?
    Implementing listIndexes in DS V2 JDBC for H2 dialect.
    
    ### Why are the changes needed?
    This is a subtask of the V2 Index 
support(https://issues.apache.org/jira/browse/SPARK-36525).
    **It can better test the index interface locally.**
    > This PR implements listIndexes in H2 dialect.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, listIndexes in DS V2 JDBC for H2
    
    ### How was this patch tested?
    Update existed UT.
    
    Closes #37172 from panbingkun/h2dialect_listindex_dev.
    
    Authored-by: panbingkun <pbk1...@gmail.com>
    Signed-off-by: huaxingao <huaxin_...@apple.com>
---
 .../sql/execution/datasources/jdbc/JdbcUtils.scala |  4 +-
 .../execution/datasources/v2/jdbc/JDBCTable.scala  |  2 +-
 .../org/apache/spark/sql/jdbc/H2Dialect.scala      | 66 +++++++++++++++++++++-
 .../org/apache/spark/sql/jdbc/JdbcDialects.scala   |  2 +-
 .../org/apache/spark/sql/jdbc/MySQLDialect.scala   |  4 +-
 .../org/apache/spark/sql/jdbc/JDBCV2Suite.scala    |  9 +++
 6 files changed, 78 insertions(+), 9 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 4401ee4564e..60ecd2ff60b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -1072,10 +1072,10 @@ object JdbcUtils extends Logging with SQLConfHelper {
    */
   def listIndexes(
       conn: Connection,
-      tableName: String,
+      tableIdent: Identifier,
       options: JDBCOptions): Array[TableIndex] = {
     val dialect = JdbcDialects.get(options.url)
-    dialect.listIndexes(conn, tableName, options)
+    dialect.listIndexes(conn, tableIdent, options)
   }
 
   private def executeStatement(conn: Connection, options: JDBCOptions, sql: 
String): Unit = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
index be8e1c68b7c..0a184116a0f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
@@ -83,7 +83,7 @@ case class JDBCTable(ident: Identifier, schema: StructType, 
jdbcOptions: JDBCOpt
 
   override def listIndexes(): Array[TableIndex] = {
     JdbcUtils.withConnection(jdbcOptions) { conn =>
-      JdbcUtils.listIndexes(conn, name, jdbcOptions)
+      JdbcUtils.listIndexes(conn, ident, jdbcOptions)
     }
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
index d41929225a8..4200ba91fb1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
@@ -25,12 +25,14 @@ import java.util.concurrent.ConcurrentHashMap
 import scala.collection.JavaConverters._
 import scala.util.control.NonFatal
 
+import org.apache.commons.lang3.StringUtils
+
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, 
NoSuchIndexException, NoSuchNamespaceException, NoSuchTableException, 
TableAlreadyExistsException}
 import org.apache.spark.sql.connector.catalog.Identifier
 import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
-import org.apache.spark.sql.connector.expressions.Expression
-import org.apache.spark.sql.connector.expressions.NamedReference
+import org.apache.spark.sql.connector.catalog.index.TableIndex
+import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, 
NamedReference}
 import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
 import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, 
DecimalType, ShortType, StringType}
 
@@ -110,6 +112,64 @@ private[sql] object H2Dialect extends JdbcDialect {
     JdbcUtils.checkIfIndexExists(conn, sql, options)
   }
 
+  // See
+  // 
https://www.h2database.com/html/systemtables.html?#information_schema_indexes
+  // 
https://www.h2database.com/html/systemtables.html?#information_schema_index_columns
+  override def listIndexes(
+      conn: Connection,
+      tableIdent: Identifier,
+      options: JDBCOptions): Array[TableIndex] = {
+    val sql = {
+      s"""
+         | SELECT
+         |   i.INDEX_CATALOG AS INDEX_CATALOG,
+         |   i.INDEX_SCHEMA AS INDEX_SCHEMA,
+         |   i.INDEX_NAME AS INDEX_NAME,
+         |   i.INDEX_TYPE_NAME AS INDEX_TYPE_NAME,
+         |   i.REMARKS as REMARKS,
+         |   ic.COLUMN_NAME AS COLUMN_NAME
+         | FROM INFORMATION_SCHEMA.INDEXES i, INFORMATION_SCHEMA.INDEX_COLUMNS 
ic
+         | WHERE i.TABLE_CATALOG = ic.TABLE_CATALOG
+         | AND i.TABLE_SCHEMA = ic.TABLE_SCHEMA
+         | AND i.TABLE_NAME = ic.TABLE_NAME
+         | AND i.INDEX_CATALOG = ic.INDEX_CATALOG
+         | AND i.INDEX_SCHEMA = ic.INDEX_SCHEMA
+         | AND i.INDEX_NAME = ic.INDEX_NAME
+         | AND i.TABLE_NAME = '${tableIdent.name()}'
+         | AND i.INDEX_SCHEMA = '${tableIdent.namespace().last}'
+         |""".stripMargin
+    }
+    var indexMap: Map[String, TableIndex] = Map()
+    try {
+      JdbcUtils.executeQuery(conn, options, sql) { rs =>
+        while (rs.next()) {
+          val indexName = rs.getString("INDEX_NAME")
+          val colName = rs.getString("COLUMN_NAME")
+          val indexType = rs.getString("INDEX_TYPE_NAME")
+          val indexComment = rs.getString("REMARKS")
+          if (indexMap.contains(indexName)) {
+            val index = indexMap(indexName)
+            val newIndex = new TableIndex(indexName, indexType,
+              index.columns() :+ FieldReference(colName),
+              index.columnProperties, index.properties)
+            indexMap += (indexName -> newIndex)
+          } else {
+            val properties = new util.Properties()
+            if (StringUtils.isNotEmpty(indexComment)) 
properties.put("COMMENT", indexComment)
+            val index = new TableIndex(indexName, indexType, 
Array(FieldReference(colName)),
+              new util.HashMap[NamedReference, util.Properties](), properties)
+            indexMap += (indexName -> index)
+          }
+        }
+      }
+    } catch {
+      case _: Exception =>
+        logWarning("Cannot retrieved index info.")
+    }
+
+    indexMap.values.toArray
+  }
+
   private def tableNameWithSchema(ident: Identifier): String = {
     (ident.namespace() :+ ident.name()).map(quoteIdentifier).mkString(".")
   }
@@ -161,7 +221,7 @@ private[sql] object H2Dialect extends JdbcDialect {
         funcName: String, isDistinct: Boolean, inputs: Array[String]): String =
       if (isDistinct && 
distinctUnsupportedAggregateFunctions.contains(funcName)) {
         throw new 
UnsupportedOperationException(s"${this.getClass.getSimpleName} does not " +
-          s"support aggregate function: $funcName with DISTINCT");
+          s"support aggregate function: $funcName with DISTINCT")
       } else {
         super.visitAggregateFunction(funcName, isDistinct, inputs)
       }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index 491e0231a23..230276e7100 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -522,7 +522,7 @@ abstract class JdbcDialect extends Serializable with 
Logging {
    */
   def listIndexes(
       conn: Connection,
-      tableName: String,
+      tableIdent: Identifier,
       options: JDBCOptions): Array[TableIndex] = {
     throw new UnsupportedOperationException("listIndexes is not supported")
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
index 7dc76eed49f..d88f3566eaa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
@@ -210,9 +210,9 @@ private case object MySQLDialect extends JdbcDialect with 
SQLConfHelper {
   // https://dev.mysql.com/doc/refman/8.0/en/show-index.html
   override def listIndexes(
       conn: Connection,
-      tableName: String,
+      tableIdent: Identifier,
       options: JDBCOptions): Array[TableIndex] = {
-    val sql = s"SHOW INDEXES FROM $tableName"
+    val sql = s"SHOW INDEXES FROM ${tableIdent.name()}"
     var indexMap: Map[String, TableIndex] = Map()
     try {
       JdbcUtils.executeQuery(conn, options, sql) { rs =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
index ddcf28652e9..ac1c59ae01e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
@@ -2259,11 +2259,20 @@ class JDBCV2Suite extends QueryTest with 
SharedSparkSession with ExplainSuiteHel
       .asInstanceOf[SupportsIndex]
     assert(jdbcTable != null)
     assert(jdbcTable.indexExists("people_index") == false)
+    val indexes1 = jdbcTable.listIndexes()
+    assert(indexes1.isEmpty)
 
     sql(s"CREATE INDEX people_index ON TABLE h2.test.people (id)")
     assert(jdbcTable.indexExists("people_index"))
+    val indexes2 = jdbcTable.listIndexes()
+    assert(!indexes2.isEmpty)
+    assert(indexes2.size == 1)
+    val tableIndex = indexes2.head
+    assert(tableIndex.indexName() == "people_index")
 
     sql(s"DROP INDEX people_index ON TABLE h2.test.people")
     assert(jdbcTable.indexExists("people_index") == false)
+    val indexes3 = jdbcTable.listIndexes()
+    assert(indexes3.isEmpty)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to