This is an automated email from the ASF dual-hosted git repository. huaxingao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 6278becfbed [SPARK-39759][SQL] Implement listIndexes in JDBC (H2 dialect) 6278becfbed is described below commit 6278becfbed412bad3d00f2b7989fd19a3a0ff07 Author: panbingkun <pbk1...@gmail.com> AuthorDate: Mon Jul 18 23:34:28 2022 -0700 [SPARK-39759][SQL] Implement listIndexes in JDBC (H2 dialect) ### What changes were proposed in this pull request? Implementing listIndexes in DS V2 JDBC for H2 dialect. ### Why are the changes needed? This is a subtask of the V2 Index support(https://issues.apache.org/jira/browse/SPARK-36525). **It can better test the index interface locally.** > This PR implements listIndexes in H2 dialect. ### Does this PR introduce _any_ user-facing change? Yes, listIndexes in DS V2 JDBC for H2 ### How was this patch tested? Update existed UT. Closes #37172 from panbingkun/h2dialect_listindex_dev. Authored-by: panbingkun <pbk1...@gmail.com> Signed-off-by: huaxingao <huaxin_...@apple.com> --- .../sql/execution/datasources/jdbc/JdbcUtils.scala | 4 +- .../execution/datasources/v2/jdbc/JDBCTable.scala | 2 +- .../org/apache/spark/sql/jdbc/H2Dialect.scala | 66 +++++++++++++++++++++- .../org/apache/spark/sql/jdbc/JdbcDialects.scala | 2 +- .../org/apache/spark/sql/jdbc/MySQLDialect.scala | 4 +- .../org/apache/spark/sql/jdbc/JDBCV2Suite.scala | 9 +++ 6 files changed, 78 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 4401ee4564e..60ecd2ff60b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -1072,10 +1072,10 @@ object JdbcUtils extends Logging with SQLConfHelper { */ def listIndexes( conn: Connection, - tableName: String, + tableIdent: Identifier, options: JDBCOptions): Array[TableIndex] = { val dialect = JdbcDialects.get(options.url) - dialect.listIndexes(conn, tableName, options) + dialect.listIndexes(conn, tableIdent, options) } private def executeStatement(conn: Connection, options: JDBCOptions, sql: String): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala index be8e1c68b7c..0a184116a0f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala @@ -83,7 +83,7 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt override def listIndexes(): Array[TableIndex] = { JdbcUtils.withConnection(jdbcOptions) { conn => - JdbcUtils.listIndexes(conn, name, jdbcOptions) + JdbcUtils.listIndexes(conn, ident, jdbcOptions) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala index d41929225a8..4200ba91fb1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala @@ -25,12 +25,14 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import scala.util.control.NonFatal +import org.apache.commons.lang3.StringUtils + import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.functions.UnboundFunction -import org.apache.spark.sql.connector.expressions.Expression -import org.apache.spark.sql.connector.expressions.NamedReference +import org.apache.spark.sql.connector.catalog.index.TableIndex +import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, NamedReference} import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DecimalType, ShortType, StringType} @@ -110,6 +112,64 @@ private[sql] object H2Dialect extends JdbcDialect { JdbcUtils.checkIfIndexExists(conn, sql, options) } + // See + // https://www.h2database.com/html/systemtables.html?#information_schema_indexes + // https://www.h2database.com/html/systemtables.html?#information_schema_index_columns + override def listIndexes( + conn: Connection, + tableIdent: Identifier, + options: JDBCOptions): Array[TableIndex] = { + val sql = { + s""" + | SELECT + | i.INDEX_CATALOG AS INDEX_CATALOG, + | i.INDEX_SCHEMA AS INDEX_SCHEMA, + | i.INDEX_NAME AS INDEX_NAME, + | i.INDEX_TYPE_NAME AS INDEX_TYPE_NAME, + | i.REMARKS as REMARKS, + | ic.COLUMN_NAME AS COLUMN_NAME + | FROM INFORMATION_SCHEMA.INDEXES i, INFORMATION_SCHEMA.INDEX_COLUMNS ic + | WHERE i.TABLE_CATALOG = ic.TABLE_CATALOG + | AND i.TABLE_SCHEMA = ic.TABLE_SCHEMA + | AND i.TABLE_NAME = ic.TABLE_NAME + | AND i.INDEX_CATALOG = ic.INDEX_CATALOG + | AND i.INDEX_SCHEMA = ic.INDEX_SCHEMA + | AND i.INDEX_NAME = ic.INDEX_NAME + | AND i.TABLE_NAME = '${tableIdent.name()}' + | AND i.INDEX_SCHEMA = '${tableIdent.namespace().last}' + |""".stripMargin + } + var indexMap: Map[String, TableIndex] = Map() + try { + JdbcUtils.executeQuery(conn, options, sql) { rs => + while (rs.next()) { + val indexName = rs.getString("INDEX_NAME") + val colName = rs.getString("COLUMN_NAME") + val indexType = rs.getString("INDEX_TYPE_NAME") + val indexComment = rs.getString("REMARKS") + if (indexMap.contains(indexName)) { + val index = indexMap(indexName) + val newIndex = new TableIndex(indexName, indexType, + index.columns() :+ FieldReference(colName), + index.columnProperties, index.properties) + indexMap += (indexName -> newIndex) + } else { + val properties = new util.Properties() + if (StringUtils.isNotEmpty(indexComment)) properties.put("COMMENT", indexComment) + val index = new TableIndex(indexName, indexType, Array(FieldReference(colName)), + new util.HashMap[NamedReference, util.Properties](), properties) + indexMap += (indexName -> index) + } + } + } + } catch { + case _: Exception => + logWarning("Cannot retrieved index info.") + } + + indexMap.values.toArray + } + private def tableNameWithSchema(ident: Identifier): String = { (ident.namespace() :+ ident.name()).map(quoteIdentifier).mkString(".") } @@ -161,7 +221,7 @@ private[sql] object H2Dialect extends JdbcDialect { funcName: String, isDistinct: Boolean, inputs: Array[String]): String = if (isDistinct && distinctUnsupportedAggregateFunctions.contains(funcName)) { throw new UnsupportedOperationException(s"${this.getClass.getSimpleName} does not " + - s"support aggregate function: $funcName with DISTINCT"); + s"support aggregate function: $funcName with DISTINCT") } else { super.visitAggregateFunction(funcName, isDistinct, inputs) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 491e0231a23..230276e7100 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -522,7 +522,7 @@ abstract class JdbcDialect extends Serializable with Logging { */ def listIndexes( conn: Connection, - tableName: String, + tableIdent: Identifier, options: JDBCOptions): Array[TableIndex] = { throw new UnsupportedOperationException("listIndexes is not supported") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index 7dc76eed49f..d88f3566eaa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -210,9 +210,9 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { // https://dev.mysql.com/doc/refman/8.0/en/show-index.html override def listIndexes( conn: Connection, - tableName: String, + tableIdent: Identifier, options: JDBCOptions): Array[TableIndex] = { - val sql = s"SHOW INDEXES FROM $tableName" + val sql = s"SHOW INDEXES FROM ${tableIdent.name()}" var indexMap: Map[String, TableIndex] = Map() try { JdbcUtils.executeQuery(conn, options, sql) { rs => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index ddcf28652e9..ac1c59ae01e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -2259,11 +2259,20 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel .asInstanceOf[SupportsIndex] assert(jdbcTable != null) assert(jdbcTable.indexExists("people_index") == false) + val indexes1 = jdbcTable.listIndexes() + assert(indexes1.isEmpty) sql(s"CREATE INDEX people_index ON TABLE h2.test.people (id)") assert(jdbcTable.indexExists("people_index")) + val indexes2 = jdbcTable.listIndexes() + assert(!indexes2.isEmpty) + assert(indexes2.size == 1) + val tableIndex = indexes2.head + assert(tableIndex.indexName() == "people_index") sql(s"DROP INDEX people_index ON TABLE h2.test.people") assert(jdbcTable.indexExists("people_index") == false) + val indexes3 = jdbcTable.listIndexes() + assert(indexes3.isEmpty) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org