This is an automated email from the ASF dual-hosted git repository. viirya pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 07ecbc4 [SPARK-36913][SQL] Implement createIndex and IndexExists in DS V2 JDBC (MySQL dialect) 07ecbc4 is described below commit 07ecbc4049aa7f8daa11e6a924c37c1db2f53c73 Author: Huaxin Gao <huaxin_...@apple.com> AuthorDate: Fri Oct 8 11:30:12 2021 -0700 [SPARK-36913][SQL] Implement createIndex and IndexExists in DS V2 JDBC (MySQL dialect) ### What changes were proposed in this pull request? Implementing `createIndex`/`IndexExists` in DS V2 JDBC ### Why are the changes needed? This is a subtask of the V2 Index support. I am implementing index support for DS V2 JDBC so we can have a POC and an end to end testing. This PR implements `createIndex` and `IndexExists`. Next PR will implement `listIndexes` and `dropIndex`. I intentionally make the PR small so it's easier to review. Index is not supported by h2 database and create/drop index are not standard SQL syntax. This PR only implements `createIndex` and `IndexExists` in `MySQL` dialect. ### Does this PR introduce _any_ user-facing change? Yes, `createIndex`/`IndexExist` in DS V2 JDBC ### How was this patch tested? new test Closes #34164 from huaxingao/createIndexJDBC. Authored-by: Huaxin Gao <huaxin_...@apple.com> Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com> --- .../spark/sql/jdbc/v2/MySQLIntegrationSuite.scala | 33 ++++++++++ .../org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala | 9 +++ .../sql/connector/catalog/index/SupportsIndex.java | 4 +- .../catalyst/analysis/AlreadyExistException.scala | 4 +- .../sql/execution/datasources/jdbc/JdbcUtils.scala | 58 +++++++++++++++++ .../execution/datasources/v2/jdbc/JDBCTable.scala | 36 ++++++++++- .../datasources/v2/jdbc/JDBCTableCatalog.scala | 55 ++++++---------- .../org/apache/spark/sql/jdbc/JdbcDialects.scala | 41 +++++++++++- .../org/apache/spark/sql/jdbc/MySQLDialect.scala | 74 +++++++++++++++++++++- 9 files changed, 268 insertions(+), 46 deletions(-) diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala index db626df..3cb8787 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala @@ -18,11 +18,16 @@ package org.apache.spark.sql.jdbc.v2 import java.sql.{Connection, SQLFeatureNotSupportedException} +import java.util import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkConf import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.analysis.IndexAlreadyExistsException +import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, TableCatalog} +import org.apache.spark.sql.connector.catalog.index.SupportsIndex +import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference} import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog import org.apache.spark.sql.jdbc.{DatabaseOnDocker, DockerJDBCIntegrationSuite} import org.apache.spark.sql.types._ @@ -115,4 +120,32 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest { val expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata) assert(t.schema === expectedSchema) } + + override def testIndex(tbl: String): Unit = { + val loaded = Catalogs.load("mysql", conf) + val jdbcTable = loaded.asInstanceOf[TableCatalog] + .loadTable(Identifier.of(Array.empty[String], "new_table")) + .asInstanceOf[SupportsIndex] + assert(jdbcTable.indexExists("i1") == false) + assert(jdbcTable.indexExists("i2") == false) + + val properties = new util.Properties(); + properties.put("KEY_BLOCK_SIZE", "10") + properties.put("COMMENT", "'this is a comment'") + jdbcTable.createIndex("i1", "", Array(FieldReference("col1")), + Array.empty[util.Map[NamedReference, util.Properties]], properties) + + jdbcTable.createIndex("i2", "", + Array(FieldReference("col2"), FieldReference("col3"), FieldReference("col5")), + Array.empty[util.Map[NamedReference, util.Properties]], new util.Properties) + + assert(jdbcTable.indexExists("i1") == true) + assert(jdbcTable.indexExists("i2") == true) + + val m = intercept[IndexAlreadyExistsException] { + jdbcTable.createIndex("i1", "", Array(FieldReference("col1")), + Array.empty[util.Map[NamedReference, util.Properties]], properties) + }.getMessage + assert(m.contains("Failed to create index: i1 in new_table")) + } } diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala index 1afe26a..da57ed7 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala @@ -180,5 +180,14 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu testCreateTableWithProperty(s"$catalogName.new_table") } } + + def testIndex(tbl: String): Unit = {} + + test("SPARK-36913: Test INDEX") { + withTable(s"$catalogName.new_table") { + sql(s"CREATE TABLE $catalogName.new_table(col1 INT, col2 INT, col3 INT, col4 INT, col5 INT)") + testIndex(s"$catalogName.new_table") + } + } } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java index a8d55fb..24961e4 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java @@ -40,14 +40,14 @@ public interface SupportsIndex extends Table { * @param indexName the name of the index to be created * @param indexType the IndexType of the index to be created * @param columns the columns on which index to be created - * @param columnProperties the properties of the columns on which index to be created + * @param columnsProperties the properties of the columns on which index to be created * @param properties the properties of the index to be created * @throws IndexAlreadyExistsException If the index already exists (optional) */ void createIndex(String indexName, String indexType, NamedReference[] columns, - Map<NamedReference, Properties>[] columnProperties, + Map<NamedReference, Properties>[] columnsProperties, Properties properties) throws IndexAlreadyExistsException; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala index ce48cfa..fb17725 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala @@ -79,5 +79,5 @@ class PartitionsAlreadyExistException(message: String) extends AnalysisException class FunctionAlreadyExistsException(db: String, func: String) extends AnalysisException(s"Function '$func' already exists in database '$db'") -class IndexAlreadyExistsException(indexName: String, table: Identifier) - extends AnalysisException(s"Index '$indexName' already exists in table ${table.quoted}") +class IndexAlreadyExistsException(message: String, cause: Option[Throwable] = None) + extends AnalysisException(message, cause = cause) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index d49f4b0..168d16a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.jdbc import java.sql.{Connection, Driver, JDBCType, PreparedStatement, ResultSet, ResultSetMetaData, SQLException} import java.time.{Instant, LocalDate} +import java.util import java.util.Locale import java.util.concurrent.TimeUnit @@ -37,6 +38,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils, GenericArrayData} import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateToDays, toJavaDate, toJavaTimestamp} import org.apache.spark.sql.connector.catalog.TableChange +import org.apache.spark.sql.connector.expressions.NamedReference import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProvider import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects, JdbcType} @@ -1009,6 +1011,35 @@ object JdbcUtils extends Logging with SQLConfHelper { executeStatement(conn, options, s"DROP SCHEMA ${dialect.quoteIdentifier(namespace)}") } + /** + * Create an index. + */ + def createIndex( + conn: Connection, + indexName: String, + indexType: String, + tableName: String, + columns: Array[NamedReference], + columnsProperties: Array[util.Map[NamedReference, util.Properties]], + properties: util.Properties, + options: JDBCOptions): Unit = { + val dialect = JdbcDialects.get(options.url) + executeStatement(conn, options, + dialect.createIndex(indexName, indexType, tableName, columns, columnsProperties, properties)) + } + + /** + * Check if an index exists + */ + def indexExists( + conn: Connection, + indexName: String, + tableName: String, + options: JDBCOptions): Boolean = { + val dialect = JdbcDialects.get(options.url) + dialect.indexExists(conn, indexName, tableName, options) + } + private def executeStatement(conn: Connection, options: JDBCOptions, sql: String): Unit = { val statement = conn.createStatement try { @@ -1018,4 +1049,31 @@ object JdbcUtils extends Logging with SQLConfHelper { statement.close() } } + + def executeQuery(conn: Connection, options: JDBCOptions, sql: String): ResultSet = { + val statement = conn.createStatement + try { + statement.setQueryTimeout(options.queryTimeout) + statement.executeQuery(sql) + } finally { + statement.close() + } + } + + def classifyException[T](message: String, dialect: JdbcDialect)(f: => T): T = { + try { + f + } catch { + case e: Throwable => throw dialect.classifyException(message, e) + } + } + + def withConnection[T](options: JDBCOptions)(f: Connection => T): T = { + val conn = createConnectionFactory(options)() + try { + f(conn) + } finally { + conn.close() + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala index d88ec2f..1db938e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala @@ -23,13 +23,16 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.catalog.TableCapability._ +import org.apache.spark.sql.connector.catalog.index.{SupportsIndex, TableIndex} +import org.apache.spark.sql.connector.expressions.NamedReference import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} -import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcOptionsInWrite} +import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcOptionsInWrite, JdbcUtils} +import org.apache.spark.sql.jdbc.JdbcDialects import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOptions) - extends Table with SupportsRead with SupportsWrite { + extends Table with SupportsRead with SupportsWrite with SupportsIndex { override def name(): String = ident.toString @@ -48,4 +51,33 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt jdbcOptions.parameters.originalMap ++ info.options.asCaseSensitiveMap().asScala) JDBCWriteBuilder(schema, mergedOptions) } + + override def createIndex( + indexName: String, + indexType: String, + columns: Array[NamedReference], + columnsProperties: Array[util.Map[NamedReference, util.Properties]], + properties: util.Properties): Unit = { + JdbcUtils.withConnection(jdbcOptions) { conn => + JdbcUtils.classifyException(s"Failed to create index: $indexName in $name", + JdbcDialects.get(jdbcOptions.url)) { + JdbcUtils.createIndex( + conn, indexName, indexType, name, columns, columnsProperties, properties, jdbcOptions) + } + } + } + + override def indexExists(indexName: String): Boolean = { + JdbcUtils.withConnection(jdbcOptions) { conn => + JdbcUtils.indexExists(conn, indexName, name, jdbcOptions) + } + } + + override def dropIndex(indexName: String): Boolean = { + throw new UnsupportedOperationException("dropIndex is not supported yet") + } + + override def listIndexes(): Array[TableIndex] = { + throw new UnsupportedOperationException("listIndexes is not supported yet") + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala index a90ab56..5667064 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.sql.execution.datasources.v2.jdbc -import java.sql.{Connection, SQLException} +import java.sql.SQLException import java.util import scala.collection.JavaConverters._ @@ -57,7 +57,7 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging override def listTables(namespace: Array[String]): Array[Identifier] = { checkNamespace(namespace) - withConnection { conn => + JdbcUtils.withConnection(options) { conn => val schemaPattern = if (namespace.length == 1) namespace.head else null val rs = conn.getMetaData .getTables(null, schemaPattern, "%", Array("TABLE")); @@ -72,14 +72,14 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging checkNamespace(ident.namespace()) val writeOptions = new JdbcOptionsInWrite( options.parameters + (JDBCOptions.JDBC_TABLE_NAME -> getTableName(ident))) - classifyException(s"Failed table existence check: $ident") { - withConnection(JdbcUtils.tableExists(_, writeOptions)) + JdbcUtils.classifyException(s"Failed table existence check: $ident", dialect) { + JdbcUtils.withConnection(options)(JdbcUtils.tableExists(_, writeOptions)) } } override def dropTable(ident: Identifier): Boolean = { checkNamespace(ident.namespace()) - withConnection { conn => + JdbcUtils.withConnection(options) { conn => try { JdbcUtils.dropTable(conn, getTableName(ident), options) true @@ -91,8 +91,8 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = { checkNamespace(oldIdent.namespace()) - withConnection { conn => - classifyException(s"Failed table renaming from $oldIdent to $newIdent") { + JdbcUtils.withConnection(options) { conn => + JdbcUtils.classifyException(s"Failed table renaming from $oldIdent to $newIdent", dialect) { JdbcUtils.renameTable(conn, getTableName(oldIdent), getTableName(newIdent), options) } } @@ -151,8 +151,8 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging val writeOptions = new JdbcOptionsInWrite(tableOptions) val caseSensitive = SQLConf.get.caseSensitiveAnalysis - withConnection { conn => - classifyException(s"Failed table creation: $ident") { + JdbcUtils.withConnection(options) { conn => + JdbcUtils.classifyException(s"Failed table creation: $ident", dialect) { JdbcUtils.createTable(conn, getTableName(ident), schema, caseSensitive, writeOptions) } } @@ -162,8 +162,8 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging override def alterTable(ident: Identifier, changes: TableChange*): Table = { checkNamespace(ident.namespace()) - withConnection { conn => - classifyException(s"Failed table altering: $ident") { + JdbcUtils.withConnection(options) { conn => + JdbcUtils.classifyException(s"Failed table altering: $ident", dialect) { JdbcUtils.alterTable(conn, getTableName(ident), changes, options) } loadTable(ident) @@ -172,7 +172,7 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging override def namespaceExists(namespace: Array[String]): Boolean = namespace match { case Array(db) => - withConnection { conn => + JdbcUtils.withConnection(options) { conn => val rs = conn.getMetaData.getSchemas(null, db) while (rs.next()) { if (rs.getString(1) == db) return true; @@ -183,7 +183,7 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging } override def listNamespaces(): Array[Array[String]] = { - withConnection { conn => + JdbcUtils.withConnection(options) { conn => val schemaBuilder = ArrayBuilder.make[Array[String]] val rs = conn.getMetaData.getSchemas() while (rs.next()) { @@ -234,8 +234,8 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging } } } - withConnection { conn => - classifyException(s"Failed create name space: $db") { + JdbcUtils.withConnection(options) { conn => + JdbcUtils.classifyException(s"Failed create name space: $db", dialect) { JdbcUtils.createNamespace(conn, options, db, comment) } } @@ -253,7 +253,7 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging changes.foreach { case set: NamespaceChange.SetProperty => if (set.property() == SupportsNamespaces.PROP_COMMENT) { - withConnection { conn => + JdbcUtils.withConnection(options) { conn => JdbcUtils.createNamespaceComment(conn, options, db, set.value) } } else { @@ -262,7 +262,7 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging case unset: NamespaceChange.RemoveProperty => if (unset.property() == SupportsNamespaces.PROP_COMMENT) { - withConnection { conn => + JdbcUtils.withConnection(options) { conn => JdbcUtils.removeNamespaceComment(conn, options, db) } } else { @@ -283,8 +283,8 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging if (listTables(Array(db)).nonEmpty) { throw QueryExecutionErrors.namespaceNotEmptyError(namespace) } - withConnection { conn => - classifyException(s"Failed drop name space: $db") { + JdbcUtils.withConnection(options) { conn => + JdbcUtils.classifyException(s"Failed drop name space: $db", dialect) { JdbcUtils.dropNamespace(conn, options, db) true } @@ -301,24 +301,7 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging } } - private def withConnection[T](f: Connection => T): T = { - val conn = JdbcUtils.createConnectionFactory(options)() - try { - f(conn) - } finally { - conn.close() - } - } - private def getTableName(ident: Identifier): String = { (ident.namespace() :+ ident.name()).map(dialect.quoteIdentifier).mkString(".") } - - private def classifyException[T](message: String)(f: => T): T = { - try { - f - } catch { - case e: Throwable => throw dialect.classifyException(message, e) - } - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index aa95711..d1c4f8d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.jdbc import java.sql.{Connection, Date, Timestamp} import java.time.{Instant, LocalDate} +import java.util import scala.collection.mutable.ArrayBuilder @@ -30,8 +31,9 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter} import org.apache.spark.sql.connector.catalog.TableChange import org.apache.spark.sql.connector.catalog.TableChange._ +import org.apache.spark.sql.connector.expressions.NamedReference import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils +import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -288,6 +290,43 @@ abstract class JdbcDialect extends Serializable with Logging{ } /** + * Creates an index. + * + * @param indexName the name of the index to be created + * @param indexType the type of the index to be created + * @param tableName the table on which index to be created + * @param columns the columns on which index to be created + * @param columnsProperties the properties of the columns on which index to be created + * @param properties the properties of the index to be created + */ + def createIndex( + indexName: String, + indexType: String, + tableName: String, + columns: Array[NamedReference], + columnsProperties: Array[util.Map[NamedReference, util.Properties]], + properties: util.Properties): String = { + throw new UnsupportedOperationException("createIndex is not supported") + } + + /** + * Checks whether an index exists + * + * @param indexName the name of the index + * @param tableName the table name on which index to be checked + * @param options JDBCOptions of the table + * @return true if the index with `indexName` exists in the table with `tableName`, + * false otherwise + */ + def indexExists( + conn: Connection, + indexName: String, + tableName: String, + options: JDBCOptions): Boolean = { + throw new UnsupportedOperationException("indexExists is not supported") + } + + /** * Gets a dialect exception, classifies it and wraps it by `AnalysisException`. * @param message The error message to be placed to the returned exception. * @param e The dialect specific exception. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index ed10770..5c16ef6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -17,14 +17,21 @@ package org.apache.spark.sql.jdbc -import java.sql.Types +import java.sql.{Connection, SQLException, Types} +import java.util import java.util.Locale +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.SQLConfHelper +import org.apache.spark.sql.catalyst.analysis.IndexAlreadyExistsException +import org.apache.spark.sql.connector.expressions.NamedReference import org.apache.spark.sql.errors.QueryExecutionErrors -import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils +import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} import org.apache.spark.sql.types.{BooleanType, DataType, FloatType, LongType, MetadataBuilder} -private case object MySQLDialect extends JdbcDialect { +private case object MySQLDialect extends JdbcDialect with SQLConfHelper { override def canHandle(url : String): Boolean = url.toLowerCase(Locale.ROOT).startsWith("jdbc:mysql") @@ -102,4 +109,65 @@ private case object MySQLDialect extends JdbcDialect { case FloatType => Option(JdbcType("FLOAT", java.sql.Types.FLOAT)) case _ => JdbcUtils.getCommonJDBCType(dt) } + + // CREATE INDEX syntax + // https://dev.mysql.com/doc/refman/8.0/en/create-index.html + override def createIndex( + indexName: String, + indexType: String, + tableName: String, + columns: Array[NamedReference], + columnsProperties: Array[util.Map[NamedReference, util.Properties]], + properties: util.Properties): String = { + val columnList = columns.map(col => quoteIdentifier(col.fieldNames.head)) + var indexProperties: String = "" + val scalaProps = properties.asScala + if (!properties.isEmpty) { + scalaProps.foreach { case (k, v) => + indexProperties = indexProperties + " " + s"$k $v" + } + } + + // columnsProperties doesn't apply to MySQL so it is ignored + s"CREATE $indexType INDEX ${quoteIdentifier(indexName)} ON" + + s" ${quoteIdentifier(tableName)}" + s" (${columnList.mkString(", ")}) $indexProperties" + } + + // SHOW INDEX syntax + // https://dev.mysql.com/doc/refman/8.0/en/show-index.html + override def indexExists( + conn: Connection, + indexName: String, + tableName: String, + options: JDBCOptions): Boolean = { + val sql = s"SHOW INDEXES FROM ${quoteIdentifier(tableName)}" + try { + val rs = JdbcUtils.executeQuery(conn, options, sql) + while (rs.next()) { + val retrievedIndexName = rs.getString("key_name") + if (conf.resolver(retrievedIndexName, indexName)) { + return true + } + } + false + } catch { + case _: Exception => + logWarning("Cannot retrieved index info.") + false + } + } + + override def classifyException(message: String, e: Throwable): AnalysisException = { + if (e.isInstanceOf[SQLException]) { + // Error codes are from + // https://mariadb.com/kb/en/mariadb-error-codes/#shared-mariadbmysql-error-codes + e.asInstanceOf[SQLException].getErrorCode match { + // ER_DUP_KEYNAME + case 1061 => + throw new IndexAlreadyExistsException(message, cause = Some(e)) + case _ => + } + } + super.classifyException(message, e) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org