This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 07ecbc4  [SPARK-36913][SQL] Implement createIndex and IndexExists in 
DS V2 JDBC (MySQL dialect)
07ecbc4 is described below

commit 07ecbc4049aa7f8daa11e6a924c37c1db2f53c73
Author: Huaxin Gao <huaxin_...@apple.com>
AuthorDate: Fri Oct 8 11:30:12 2021 -0700

    [SPARK-36913][SQL] Implement createIndex and IndexExists in DS V2 JDBC 
(MySQL dialect)
    
    ### What changes were proposed in this pull request?
    Implementing `createIndex`/`IndexExists` in DS V2 JDBC
    
    ### Why are the changes needed?
    This is a subtask of the V2 Index support. I am implementing index support 
for DS V2 JDBC so we can have a POC and an end to end testing. This PR 
implements `createIndex` and `IndexExists`. Next PR will implement 
`listIndexes` and `dropIndex`. I intentionally make the PR small so it's easier 
to review.
    
    Index is not supported by h2 database and create/drop index are not 
standard SQL syntax. This PR only implements `createIndex` and `IndexExists` in 
`MySQL` dialect.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, `createIndex`/`IndexExist` in DS V2 JDBC
    
    ### How was this patch tested?
    new test
    
    Closes #34164 from huaxingao/createIndexJDBC.
    
    Authored-by: Huaxin Gao <huaxin_...@apple.com>
    Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com>
---
 .../spark/sql/jdbc/v2/MySQLIntegrationSuite.scala  | 33 ++++++++++
 .../org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala  |  9 +++
 .../sql/connector/catalog/index/SupportsIndex.java |  4 +-
 .../catalyst/analysis/AlreadyExistException.scala  |  4 +-
 .../sql/execution/datasources/jdbc/JdbcUtils.scala | 58 +++++++++++++++++
 .../execution/datasources/v2/jdbc/JDBCTable.scala  | 36 ++++++++++-
 .../datasources/v2/jdbc/JDBCTableCatalog.scala     | 55 ++++++----------
 .../org/apache/spark/sql/jdbc/JdbcDialects.scala   | 41 +++++++++++-
 .../org/apache/spark/sql/jdbc/MySQLDialect.scala   | 74 +++++++++++++++++++++-
 9 files changed, 268 insertions(+), 46 deletions(-)

diff --git 
a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
 
b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
index db626df..3cb8787 100644
--- 
a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
+++ 
b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
@@ -18,11 +18,16 @@
 package org.apache.spark.sql.jdbc.v2
 
 import java.sql.{Connection, SQLFeatureNotSupportedException}
+import java.util
 
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.analysis.IndexAlreadyExistsException
+import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, 
TableCatalog}
+import org.apache.spark.sql.connector.catalog.index.SupportsIndex
+import org.apache.spark.sql.connector.expressions.{FieldReference, 
NamedReference}
 import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
 import org.apache.spark.sql.jdbc.{DatabaseOnDocker, DockerJDBCIntegrationSuite}
 import org.apache.spark.sql.types._
@@ -115,4 +120,32 @@ class MySQLIntegrationSuite extends 
DockerJDBCIntegrationSuite with V2JDBCTest {
     val expectedSchema = new StructType().add("ID", IntegerType, true, 
defaultMetadata)
     assert(t.schema === expectedSchema)
   }
+
+  override def testIndex(tbl: String): Unit = {
+    val loaded = Catalogs.load("mysql", conf)
+    val jdbcTable = loaded.asInstanceOf[TableCatalog]
+      .loadTable(Identifier.of(Array.empty[String], "new_table"))
+      .asInstanceOf[SupportsIndex]
+    assert(jdbcTable.indexExists("i1") == false)
+    assert(jdbcTable.indexExists("i2") == false)
+
+    val properties = new util.Properties();
+    properties.put("KEY_BLOCK_SIZE", "10")
+    properties.put("COMMENT", "'this is a comment'")
+    jdbcTable.createIndex("i1", "", Array(FieldReference("col1")),
+      Array.empty[util.Map[NamedReference, util.Properties]], properties)
+
+    jdbcTable.createIndex("i2", "",
+      Array(FieldReference("col2"), FieldReference("col3"), 
FieldReference("col5")),
+      Array.empty[util.Map[NamedReference, util.Properties]], new 
util.Properties)
+
+    assert(jdbcTable.indexExists("i1") == true)
+    assert(jdbcTable.indexExists("i2") == true)
+
+    val m = intercept[IndexAlreadyExistsException] {
+      jdbcTable.createIndex("i1", "", Array(FieldReference("col1")),
+        Array.empty[util.Map[NamedReference, util.Properties]], properties)
+    }.getMessage
+    assert(m.contains("Failed to create index: i1 in new_table"))
+  }
 }
diff --git 
a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
 
b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
index 1afe26a..da57ed7 100644
--- 
a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
+++ 
b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
@@ -180,5 +180,14 @@ private[v2] trait V2JDBCTest extends SharedSparkSession 
with DockerIntegrationFu
       testCreateTableWithProperty(s"$catalogName.new_table")
     }
   }
+
+  def testIndex(tbl: String): Unit = {}
+
+  test("SPARK-36913: Test INDEX") {
+    withTable(s"$catalogName.new_table") {
+      sql(s"CREATE TABLE $catalogName.new_table(col1 INT, col2 INT, col3 INT, 
col4 INT, col5 INT)")
+      testIndex(s"$catalogName.new_table")
+    }
+  }
 }
 
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java
index a8d55fb..24961e4 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java
@@ -40,14 +40,14 @@ public interface SupportsIndex extends Table {
    * @param indexName the name of the index to be created
    * @param indexType the IndexType of the index to be created
    * @param columns the columns on which index to be created
-   * @param columnProperties the properties of the columns on which index to 
be created
+   * @param columnsProperties the properties of the columns on which index to 
be created
    * @param properties the properties of the index to be created
    * @throws IndexAlreadyExistsException If the index already exists (optional)
    */
   void createIndex(String indexName,
       String indexType,
       NamedReference[] columns,
-      Map<NamedReference, Properties>[] columnProperties,
+      Map<NamedReference, Properties>[] columnsProperties,
       Properties properties)
       throws IndexAlreadyExistsException;
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala
index ce48cfa..fb17725 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala
@@ -79,5 +79,5 @@ class PartitionsAlreadyExistException(message: String) 
extends AnalysisException
 class FunctionAlreadyExistsException(db: String, func: String)
   extends AnalysisException(s"Function '$func' already exists in database 
'$db'")
 
-class IndexAlreadyExistsException(indexName: String, table: Identifier)
-  extends AnalysisException(s"Index '$indexName' already exists in table 
${table.quoted}")
+class IndexAlreadyExistsException(message: String, cause: Option[Throwable] = 
None)
+  extends AnalysisException(message, cause = cause)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index d49f4b0..168d16a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.jdbc
 
 import java.sql.{Connection, Driver, JDBCType, PreparedStatement, ResultSet, 
ResultSetMetaData, SQLException}
 import java.time.{Instant, LocalDate}
+import java.util
 import java.util.Locale
 import java.util.concurrent.TimeUnit
 
@@ -37,6 +38,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils, 
GenericArrayData}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, 
localDateToDays, toJavaDate, toJavaTimestamp}
 import org.apache.spark.sql.connector.catalog.TableChange
+import org.apache.spark.sql.connector.expressions.NamedReference
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
 import 
org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProvider
 import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects, JdbcType}
@@ -1009,6 +1011,35 @@ object JdbcUtils extends Logging with SQLConfHelper {
     executeStatement(conn, options, s"DROP SCHEMA 
${dialect.quoteIdentifier(namespace)}")
   }
 
+  /**
+   * Create an index.
+   */
+  def createIndex(
+      conn: Connection,
+      indexName: String,
+      indexType: String,
+      tableName: String,
+      columns: Array[NamedReference],
+      columnsProperties: Array[util.Map[NamedReference, util.Properties]],
+      properties: util.Properties,
+      options: JDBCOptions): Unit = {
+    val dialect = JdbcDialects.get(options.url)
+    executeStatement(conn, options,
+      dialect.createIndex(indexName, indexType, tableName, columns, 
columnsProperties, properties))
+  }
+
+  /**
+   * Check if an index exists
+   */
+  def indexExists(
+      conn: Connection,
+      indexName: String,
+      tableName: String,
+      options: JDBCOptions): Boolean = {
+    val dialect = JdbcDialects.get(options.url)
+    dialect.indexExists(conn, indexName, tableName, options)
+  }
+
   private def executeStatement(conn: Connection, options: JDBCOptions, sql: 
String): Unit = {
     val statement = conn.createStatement
     try {
@@ -1018,4 +1049,31 @@ object JdbcUtils extends Logging with SQLConfHelper {
       statement.close()
     }
   }
+
+  def executeQuery(conn: Connection, options: JDBCOptions, sql: String): 
ResultSet = {
+    val statement = conn.createStatement
+    try {
+      statement.setQueryTimeout(options.queryTimeout)
+      statement.executeQuery(sql)
+    } finally {
+      statement.close()
+    }
+  }
+
+  def classifyException[T](message: String, dialect: JdbcDialect)(f: => T): T 
= {
+    try {
+      f
+    } catch {
+      case e: Throwable => throw dialect.classifyException(message, e)
+    }
+  }
+
+  def withConnection[T](options: JDBCOptions)(f: Connection => T): T = {
+    val conn = createConnectionFactory(options)()
+    try {
+      f(conn)
+    } finally {
+      conn.close()
+    }
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
index d88ec2f..1db938e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
@@ -23,13 +23,16 @@ import scala.collection.JavaConverters._
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connector.catalog._
 import org.apache.spark.sql.connector.catalog.TableCapability._
+import org.apache.spark.sql.connector.catalog.index.{SupportsIndex, TableIndex}
+import org.apache.spark.sql.connector.expressions.NamedReference
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, 
JdbcOptionsInWrite}
+import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, 
JdbcOptionsInWrite, JdbcUtils}
+import org.apache.spark.sql.jdbc.JdbcDialects
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: 
JDBCOptions)
-  extends Table with SupportsRead with SupportsWrite {
+  extends Table with SupportsRead with SupportsWrite with SupportsIndex {
 
   override def name(): String = ident.toString
 
@@ -48,4 +51,33 @@ case class JDBCTable(ident: Identifier, schema: StructType, 
jdbcOptions: JDBCOpt
       jdbcOptions.parameters.originalMap ++ 
info.options.asCaseSensitiveMap().asScala)
     JDBCWriteBuilder(schema, mergedOptions)
   }
+
+  override def createIndex(
+      indexName: String,
+      indexType: String,
+      columns: Array[NamedReference],
+      columnsProperties: Array[util.Map[NamedReference, util.Properties]],
+      properties: util.Properties): Unit = {
+    JdbcUtils.withConnection(jdbcOptions) { conn =>
+      JdbcUtils.classifyException(s"Failed to create index: $indexName in 
$name",
+        JdbcDialects.get(jdbcOptions.url)) {
+        JdbcUtils.createIndex(
+          conn, indexName, indexType, name, columns, columnsProperties, 
properties, jdbcOptions)
+      }
+    }
+  }
+
+  override def indexExists(indexName: String): Boolean = {
+    JdbcUtils.withConnection(jdbcOptions) { conn =>
+      JdbcUtils.indexExists(conn, indexName, name, jdbcOptions)
+    }
+  }
+
+  override def dropIndex(indexName: String): Boolean = {
+    throw new UnsupportedOperationException("dropIndex is not supported yet")
+  }
+
+  override def listIndexes(): Array[TableIndex] = {
+    throw new UnsupportedOperationException("listIndexes is not supported yet")
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala
index a90ab56..5667064 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.sql.execution.datasources.v2.jdbc
 
-import java.sql.{Connection, SQLException}
+import java.sql.SQLException
 import java.util
 
 import scala.collection.JavaConverters._
@@ -57,7 +57,7 @@ class JDBCTableCatalog extends TableCatalog with 
SupportsNamespaces with Logging
 
   override def listTables(namespace: Array[String]): Array[Identifier] = {
     checkNamespace(namespace)
-    withConnection { conn =>
+    JdbcUtils.withConnection(options) { conn =>
       val schemaPattern = if (namespace.length == 1) namespace.head else null
       val rs = conn.getMetaData
         .getTables(null, schemaPattern, "%", Array("TABLE"));
@@ -72,14 +72,14 @@ class JDBCTableCatalog extends TableCatalog with 
SupportsNamespaces with Logging
     checkNamespace(ident.namespace())
     val writeOptions = new JdbcOptionsInWrite(
       options.parameters + (JDBCOptions.JDBC_TABLE_NAME -> 
getTableName(ident)))
-    classifyException(s"Failed table existence check: $ident") {
-      withConnection(JdbcUtils.tableExists(_, writeOptions))
+    JdbcUtils.classifyException(s"Failed table existence check: $ident", 
dialect) {
+      JdbcUtils.withConnection(options)(JdbcUtils.tableExists(_, writeOptions))
     }
   }
 
   override def dropTable(ident: Identifier): Boolean = {
     checkNamespace(ident.namespace())
-    withConnection { conn =>
+    JdbcUtils.withConnection(options) { conn =>
       try {
         JdbcUtils.dropTable(conn, getTableName(ident), options)
         true
@@ -91,8 +91,8 @@ class JDBCTableCatalog extends TableCatalog with 
SupportsNamespaces with Logging
 
   override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = 
{
     checkNamespace(oldIdent.namespace())
-    withConnection { conn =>
-      classifyException(s"Failed table renaming from $oldIdent to $newIdent") {
+    JdbcUtils.withConnection(options) { conn =>
+      JdbcUtils.classifyException(s"Failed table renaming from $oldIdent to 
$newIdent", dialect) {
         JdbcUtils.renameTable(conn, getTableName(oldIdent), 
getTableName(newIdent), options)
       }
     }
@@ -151,8 +151,8 @@ class JDBCTableCatalog extends TableCatalog with 
SupportsNamespaces with Logging
 
     val writeOptions = new JdbcOptionsInWrite(tableOptions)
     val caseSensitive = SQLConf.get.caseSensitiveAnalysis
-    withConnection { conn =>
-      classifyException(s"Failed table creation: $ident") {
+    JdbcUtils.withConnection(options) { conn =>
+      JdbcUtils.classifyException(s"Failed table creation: $ident", dialect) {
         JdbcUtils.createTable(conn, getTableName(ident), schema, 
caseSensitive, writeOptions)
       }
     }
@@ -162,8 +162,8 @@ class JDBCTableCatalog extends TableCatalog with 
SupportsNamespaces with Logging
 
   override def alterTable(ident: Identifier, changes: TableChange*): Table = {
     checkNamespace(ident.namespace())
-    withConnection { conn =>
-      classifyException(s"Failed table altering: $ident") {
+    JdbcUtils.withConnection(options) { conn =>
+      JdbcUtils.classifyException(s"Failed table altering: $ident", dialect) {
         JdbcUtils.alterTable(conn, getTableName(ident), changes, options)
       }
       loadTable(ident)
@@ -172,7 +172,7 @@ class JDBCTableCatalog extends TableCatalog with 
SupportsNamespaces with Logging
 
   override def namespaceExists(namespace: Array[String]): Boolean = namespace 
match {
     case Array(db) =>
-      withConnection { conn =>
+      JdbcUtils.withConnection(options) { conn =>
         val rs = conn.getMetaData.getSchemas(null, db)
         while (rs.next()) {
           if (rs.getString(1) == db) return true;
@@ -183,7 +183,7 @@ class JDBCTableCatalog extends TableCatalog with 
SupportsNamespaces with Logging
   }
 
   override def listNamespaces(): Array[Array[String]] = {
-    withConnection { conn =>
+    JdbcUtils.withConnection(options) { conn =>
       val schemaBuilder = ArrayBuilder.make[Array[String]]
       val rs = conn.getMetaData.getSchemas()
       while (rs.next()) {
@@ -234,8 +234,8 @@ class JDBCTableCatalog extends TableCatalog with 
SupportsNamespaces with Logging
           }
         }
       }
-      withConnection { conn =>
-        classifyException(s"Failed create name space: $db") {
+      JdbcUtils.withConnection(options) { conn =>
+        JdbcUtils.classifyException(s"Failed create name space: $db", dialect) 
{
           JdbcUtils.createNamespace(conn, options, db, comment)
         }
       }
@@ -253,7 +253,7 @@ class JDBCTableCatalog extends TableCatalog with 
SupportsNamespaces with Logging
         changes.foreach {
           case set: NamespaceChange.SetProperty =>
             if (set.property() == SupportsNamespaces.PROP_COMMENT) {
-              withConnection { conn =>
+              JdbcUtils.withConnection(options) { conn =>
                 JdbcUtils.createNamespaceComment(conn, options, db, set.value)
               }
             } else {
@@ -262,7 +262,7 @@ class JDBCTableCatalog extends TableCatalog with 
SupportsNamespaces with Logging
 
           case unset: NamespaceChange.RemoveProperty =>
             if (unset.property() == SupportsNamespaces.PROP_COMMENT) {
-              withConnection { conn =>
+              JdbcUtils.withConnection(options) { conn =>
                 JdbcUtils.removeNamespaceComment(conn, options, db)
               }
             } else {
@@ -283,8 +283,8 @@ class JDBCTableCatalog extends TableCatalog with 
SupportsNamespaces with Logging
       if (listTables(Array(db)).nonEmpty) {
         throw QueryExecutionErrors.namespaceNotEmptyError(namespace)
       }
-      withConnection { conn =>
-        classifyException(s"Failed drop name space: $db") {
+      JdbcUtils.withConnection(options) { conn =>
+        JdbcUtils.classifyException(s"Failed drop name space: $db", dialect) {
           JdbcUtils.dropNamespace(conn, options, db)
           true
         }
@@ -301,24 +301,7 @@ class JDBCTableCatalog extends TableCatalog with 
SupportsNamespaces with Logging
     }
   }
 
-  private def withConnection[T](f: Connection => T): T = {
-    val conn = JdbcUtils.createConnectionFactory(options)()
-    try {
-      f(conn)
-    } finally {
-      conn.close()
-    }
-  }
-
   private def getTableName(ident: Identifier): String = {
     (ident.namespace() :+ 
ident.name()).map(dialect.quoteIdentifier).mkString(".")
   }
-
-  private def classifyException[T](message: String)(f: => T): T = {
-    try {
-      f
-    } catch {
-      case e: Throwable => throw dialect.classifyException(message, e)
-    }
-  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index aa95711..d1c4f8d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.jdbc
 
 import java.sql.{Connection, Date, Timestamp}
 import java.time.{Instant, LocalDate}
+import java.util
 
 import scala.collection.mutable.ArrayBuilder
 
@@ -30,8 +31,9 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, 
TimestampFormatter}
 import org.apache.spark.sql.connector.catalog.TableChange
 import org.apache.spark.sql.connector.catalog.TableChange._
+import org.apache.spark.sql.connector.expressions.NamedReference
 import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
+import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
@@ -288,6 +290,43 @@ abstract class JdbcDialect extends Serializable with 
Logging{
   }
 
   /**
+   * Creates an index.
+   *
+   * @param indexName         the name of the index to be created
+   * @param indexType         the type of the index to be created
+   * @param tableName         the table on which index to be created
+   * @param columns           the columns on which index to be created
+   * @param columnsProperties the properties of the columns on which index to 
be created
+   * @param properties        the properties of the index to be created
+   */
+  def createIndex(
+      indexName: String,
+      indexType: String,
+      tableName: String,
+      columns: Array[NamedReference],
+      columnsProperties: Array[util.Map[NamedReference, util.Properties]],
+      properties: util.Properties): String = {
+    throw new UnsupportedOperationException("createIndex is not supported")
+  }
+
+  /**
+   * Checks whether an index exists
+   *
+   * @param indexName the name of the index
+   * @param tableName the table name on which index to be checked
+   * @param options JDBCOptions of the table
+   * @return true if the index with `indexName` exists in the table with 
`tableName`,
+   *         false otherwise
+   */
+  def indexExists(
+      conn: Connection,
+      indexName: String,
+      tableName: String,
+      options: JDBCOptions): Boolean = {
+    throw new UnsupportedOperationException("indexExists is not supported")
+  }
+
+  /**
    * Gets a dialect exception, classifies it and wraps it by 
`AnalysisException`.
    * @param message The error message to be placed to the returned exception.
    * @param e The dialect specific exception.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
index ed10770..5c16ef6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
@@ -17,14 +17,21 @@
 
 package org.apache.spark.sql.jdbc
 
-import java.sql.Types
+import java.sql.{Connection, SQLException, Types}
+import java.util
 import java.util.Locale
 
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.analysis.IndexAlreadyExistsException
+import org.apache.spark.sql.connector.expressions.NamedReference
 import org.apache.spark.sql.errors.QueryExecutionErrors
-import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
+import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
 import org.apache.spark.sql.types.{BooleanType, DataType, FloatType, LongType, 
MetadataBuilder}
 
-private case object MySQLDialect extends JdbcDialect {
+private case object MySQLDialect extends JdbcDialect with SQLConfHelper {
 
   override def canHandle(url : String): Boolean =
     url.toLowerCase(Locale.ROOT).startsWith("jdbc:mysql")
@@ -102,4 +109,65 @@ private case object MySQLDialect extends JdbcDialect {
     case FloatType => Option(JdbcType("FLOAT", java.sql.Types.FLOAT))
     case _ => JdbcUtils.getCommonJDBCType(dt)
   }
+
+  // CREATE INDEX syntax
+  // https://dev.mysql.com/doc/refman/8.0/en/create-index.html
+  override def createIndex(
+      indexName: String,
+      indexType: String,
+      tableName: String,
+      columns: Array[NamedReference],
+      columnsProperties: Array[util.Map[NamedReference, util.Properties]],
+      properties: util.Properties): String = {
+    val columnList = columns.map(col => quoteIdentifier(col.fieldNames.head))
+    var indexProperties: String = ""
+    val scalaProps = properties.asScala
+    if (!properties.isEmpty) {
+      scalaProps.foreach { case (k, v) =>
+        indexProperties = indexProperties + " " + s"$k $v"
+      }
+    }
+
+    // columnsProperties doesn't apply to MySQL so it is ignored
+    s"CREATE $indexType INDEX ${quoteIdentifier(indexName)} ON" +
+      s" ${quoteIdentifier(tableName)}" + s" (${columnList.mkString(", ")}) 
$indexProperties"
+  }
+
+  // SHOW INDEX syntax
+  // https://dev.mysql.com/doc/refman/8.0/en/show-index.html
+  override def indexExists(
+      conn: Connection,
+      indexName: String,
+      tableName: String,
+      options: JDBCOptions): Boolean = {
+    val sql = s"SHOW INDEXES FROM ${quoteIdentifier(tableName)}"
+    try {
+      val rs = JdbcUtils.executeQuery(conn, options, sql)
+      while (rs.next()) {
+        val retrievedIndexName = rs.getString("key_name")
+        if (conf.resolver(retrievedIndexName, indexName)) {
+          return true
+        }
+      }
+      false
+    } catch {
+      case _: Exception =>
+        logWarning("Cannot retrieved index info.")
+        false
+    }
+  }
+
+  override def classifyException(message: String, e: Throwable): 
AnalysisException = {
+    if (e.isInstanceOf[SQLException]) {
+      // Error codes are from
+      // 
https://mariadb.com/kb/en/mariadb-error-codes/#shared-mariadbmysql-error-codes
+      e.asInstanceOf[SQLException].getErrorCode match {
+        // ER_DUP_KEYNAME
+        case 1061 =>
+          throw new IndexAlreadyExistsException(message, cause = Some(e))
+        case _ =>
+      }
+    }
+    super.classifyException(message, e)
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to