This is an automated email from the ASF dual-hosted git repository. sarutak pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new d8a4a8c [SPARK-36895][SQL][FOLLOWUP] Use property to specify index type d8a4a8c is described below commit d8a4a8c629db6ae8081477e58fdbf20983b48a39 Author: Huaxin Gao <huaxin_...@apple.com> AuthorDate: Tue Nov 9 04:21:29 2021 +0900 [SPARK-36895][SQL][FOLLOWUP] Use property to specify index type ### What changes were proposed in this pull request? use property to specify index type ### Why are the changes needed? fix scala doc in https://github.com/apache/spark/pull/34486 and resubmit ### Does this PR introduce _any_ user-facing change? Yes ``` void createIndex(String indexName, String indexType, NamedReference[] columns, Map<NamedReference, Map<String, String>> columnsProperties, Map<String, String> properties) ``` changed to ``` createIndex(String indexName, NamedReference[] columns, Map<NamedReference, Map<String, String>> columnsProperties, Map<String, String> properties ``` ### How was this patch tested? new test Closes #34523 from huaxingao/newDelete. Authored-by: Huaxin Gao <huaxin_...@apple.com> Signed-off-by: Kousuke Saruta <saru...@oss.nttdata.com> --- .../spark/sql/jdbc/v2/MySQLIntegrationSuite.scala | 67 ------------------ .../org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala | 82 +++++----------------- .../sql/connector/catalog/index/SupportsIndex.java | 8 ++- .../sql/execution/datasources/jdbc/JdbcUtils.scala | 3 +- .../execution/datasources/v2/CreateIndexExec.scala | 9 ++- .../execution/datasources/v2/jdbc/JDBCTable.scala | 3 +- .../org/apache/spark/sql/jdbc/JdbcDialects.scala | 2 - .../org/apache/spark/sql/jdbc/MySQLDialect.scala | 27 ++++--- 8 files changed, 45 insertions(+), 156 deletions(-) diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala index d77dcb4..592f7d6 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala @@ -18,16 +18,11 @@ package org.apache.spark.sql.jdbc.v2 import java.sql.{Connection, SQLFeatureNotSupportedException} -import java.util import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkConf import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException} -import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, TableCatalog} -import org.apache.spark.sql.connector.catalog.index.SupportsIndex -import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference} import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog import org.apache.spark.sql.jdbc.{DatabaseOnDocker, DockerJDBCIntegrationSuite} import org.apache.spark.sql.types._ @@ -122,66 +117,4 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite with V2JDBCTest { } override def supportsIndex: Boolean = true - - override def testIndexProperties(jdbcTable: SupportsIndex): Unit = { - val properties = new util.HashMap[String, String](); - properties.put("KEY_BLOCK_SIZE", "10") - properties.put("COMMENT", "'this is a comment'") - // MySQL doesn't allow property set on individual column, so use empty Array for - // column properties - jdbcTable.createIndex("i1", "BTREE", Array(FieldReference("col1")), - new util.HashMap[NamedReference, util.Map[String, String]](), properties) - - var index = jdbcTable.listIndexes() - // The index property size is actually 1. Even though the index is created - // with properties "KEY_BLOCK_SIZE", "10" and "COMMENT", "'this is a comment'", when - // retrieving index using `SHOW INDEXES`, MySQL only returns `COMMENT`. - assert(index(0).properties.size == 1) - assert(index(0).properties.get("COMMENT").equals("this is a comment")) - } - - override def testIndexUsingSQL(tbl: String): Unit = { - val loaded = Catalogs.load("mysql", conf) - val jdbcTable = loaded.asInstanceOf[TableCatalog] - .loadTable(Identifier.of(Array.empty[String], "new_table")) - .asInstanceOf[SupportsIndex] - assert(jdbcTable.indexExists("i1") == false) - assert(jdbcTable.indexExists("i2") == false) - - val indexType = "DUMMY" - var m = intercept[UnsupportedOperationException] { - sql(s"CREATE index i1 ON $catalogName.new_table USING DUMMY (col1)") - }.getMessage - assert(m.contains(s"Index Type $indexType is not supported." + - s" The supported Index Types are: BTREE and HASH")) - - sql(s"CREATE index i1 ON $catalogName.new_table USING BTREE (col1)") - sql(s"CREATE index i2 ON $catalogName.new_table (col2, col3, col5)" + - s" OPTIONS (KEY_BLOCK_SIZE=10)") - - assert(jdbcTable.indexExists("i1") == true) - assert(jdbcTable.indexExists("i2") == true) - - // This should pass without exception - sql(s"CREATE index IF NOT EXISTS i1 ON $catalogName.new_table (col1)") - - m = intercept[IndexAlreadyExistsException] { - sql(s"CREATE index i1 ON $catalogName.new_table (col1)") - }.getMessage - assert(m.contains("Failed to create index i1 in new_table")) - - sql(s"DROP index i1 ON $catalogName.new_table") - sql(s"DROP index i2 ON $catalogName.new_table") - - assert(jdbcTable.indexExists("i1") == false) - assert(jdbcTable.indexExists("i2") == false) - - // This should pass without exception - sql(s"DROP index IF EXISTS i1 ON $catalogName.new_table") - - m = intercept[NoSuchIndexException] { - sql(s"DROP index i1 ON $catalogName.new_table") - }.getMessage - assert(m.contains("Failed to drop index i1 in new_table")) - } } diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala index 717624b..d292051 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.jdbc.v2 -import java.util - import org.apache.log4j.Level import org.apache.spark.sql.AnalysisException @@ -27,7 +25,6 @@ import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSu import org.apache.spark.sql.catalyst.plans.logical.{Filter, Sample} import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, TableCatalog} import org.apache.spark.sql.connector.catalog.index.SupportsIndex -import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference} import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, V1ScanWrapper} import org.apache.spark.sql.jdbc.DockerIntegrationFunSuite import org.apache.spark.sql.test.SharedSparkSession @@ -193,14 +190,12 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu } def supportsIndex: Boolean = false - def testIndexProperties(jdbcTable: SupportsIndex): Unit = {} - def testIndexUsingSQL(tbl: String): Unit = {} - test("SPARK-36913: Test INDEX") { + test("SPARK-36895: Test INDEX Using SQL") { if (supportsIndex) { withTable(s"$catalogName.new_table") { sql(s"CREATE TABLE $catalogName.new_table(col1 INT, col2 INT, col3 INT," + - s" col4 INT, col5 INT)") + " col4 INT, col5 INT)") val loaded = Catalogs.load(catalogName, conf) val jdbcTable = loaded.asInstanceOf[TableCatalog] .loadTable(Identifier.of(Array.empty[String], "new_table")) @@ -208,88 +203,45 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu assert(jdbcTable.indexExists("i1") == false) assert(jdbcTable.indexExists("i2") == false) - val properties = new util.HashMap[String, String](); val indexType = "DUMMY" var m = intercept[UnsupportedOperationException] { - jdbcTable.createIndex("i1", indexType, Array(FieldReference("col1")), - new util.HashMap[NamedReference, util.Map[String, String]](), properties) + sql(s"CREATE index i1 ON $catalogName.new_table USING $indexType (col1)") }.getMessage assert(m.contains(s"Index Type $indexType is not supported." + s" The supported Index Types are: BTREE and HASH")) - jdbcTable.createIndex("i1", "BTREE", Array(FieldReference("col1")), - new util.HashMap[NamedReference, util.Map[String, String]](), properties) - - jdbcTable.createIndex("i2", "", - Array(FieldReference("col2"), FieldReference("col3"), FieldReference("col5")), - new util.HashMap[NamedReference, util.Map[String, String]](), properties) + sql(s"CREATE index i1 ON $catalogName.new_table USING BTREE (col1)") + sql(s"CREATE index i2 ON $catalogName.new_table (col2, col3, col5)" + + s" OPTIONS (KEY_BLOCK_SIZE=10)") assert(jdbcTable.indexExists("i1") == true) assert(jdbcTable.indexExists("i2") == true) + // This should pass without exception + sql(s"CREATE index IF NOT EXISTS i1 ON $catalogName.new_table (col1)") + m = intercept[IndexAlreadyExistsException] { - jdbcTable.createIndex("i1", "", Array(FieldReference("col1")), - new util.HashMap[NamedReference, util.Map[String, String]](), properties) + sql(s"CREATE index i1 ON $catalogName.new_table (col1)") }.getMessage assert(m.contains("Failed to create index i1 in new_table")) - var index = jdbcTable.listIndexes() - assert(index.length == 2) - - assert(index(0).indexName.equals("i1")) - assert(index(0).indexType.equals("BTREE")) - var cols = index(0).columns - assert(cols.length == 1) - assert(cols(0).describe().equals("col1")) - assert(index(0).properties.size == 0) - - assert(index(1).indexName.equals("i2")) - assert(index(1).indexType.equals("BTREE")) - cols = index(1).columns - assert(cols.length == 3) - assert(cols(0).describe().equals("col2")) - assert(cols(1).describe().equals("col3")) - assert(cols(2).describe().equals("col5")) - assert(index(1).properties.size == 0) - - jdbcTable.dropIndex("i1") - assert(jdbcTable.indexExists("i1") == false) - assert(jdbcTable.indexExists("i2") == true) - - index = jdbcTable.listIndexes() - assert(index.length == 1) - - assert(index(0).indexName.equals("i2")) - assert(index(0).indexType.equals("BTREE")) - cols = index(0).columns - assert(cols.length == 3) - assert(cols(0).describe().equals("col2")) - assert(cols(1).describe().equals("col3")) - assert(cols(2).describe().equals("col5")) + sql(s"DROP index i1 ON $catalogName.new_table") + sql(s"DROP index i2 ON $catalogName.new_table") - jdbcTable.dropIndex("i2") assert(jdbcTable.indexExists("i1") == false) assert(jdbcTable.indexExists("i2") == false) - index = jdbcTable.listIndexes() - assert(index.length == 0) + + // This should pass without exception + sql(s"DROP index IF EXISTS i1 ON $catalogName.new_table") m = intercept[NoSuchIndexException] { - jdbcTable.dropIndex("i2") + sql(s"DROP index i1 ON $catalogName.new_table") }.getMessage - assert(m.contains("Failed to drop index i2 in new_table")) - - testIndexProperties(jdbcTable) + assert(m.contains("Failed to drop index i1 in new_table")) } } } - test("SPARK-36895: Test INDEX Using SQL") { - withTable(s"$catalogName.new_table") { - sql(s"CREATE TABLE $catalogName.new_table(col1 INT, col2 INT, col3 INT, col4 INT, col5 INT)") - testIndexUsingSQL(s"$catalogName.new_table") - } - } - def supportsTableSample: Boolean = false private def samplePushed(df: DataFrame): Boolean = { diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java index 9cf39eb..734b290 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java @@ -34,18 +34,20 @@ import org.apache.spark.sql.connector.expressions.NamedReference; public interface SupportsIndex extends Table { /** + * A reserved property to specify the index type. + */ + String PROP_TYPE = "type"; + + /** * Creates an index. * * @param indexName the name of the index to be created - * @param indexType the type of the index to be created. If this is not specified, Spark - * will use empty String. * @param columns the columns on which index to be created * @param columnsProperties the properties of the columns on which index to be created * @param properties the properties of the index to be created * @throws IndexAlreadyExistsException If the index already exists. */ void createIndex(String indexName, - String indexType, NamedReference[] columns, Map<NamedReference, Map<String, String>> columnsProperties, Map<String, String> properties) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 4402f27..526f9d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -1018,7 +1018,6 @@ object JdbcUtils extends Logging with SQLConfHelper { def createIndex( conn: Connection, indexName: String, - indexType: String, tableName: String, columns: Array[NamedReference], columnsProperties: util.Map[NamedReference, util.Map[String, String]], @@ -1026,7 +1025,7 @@ object JdbcUtils extends Logging with SQLConfHelper { options: JDBCOptions): Unit = { val dialect = JdbcDialects.get(options.url) executeStatement(conn, options, - dialect.createIndex(indexName, indexType, tableName, columns, columnsProperties, properties)) + dialect.createIndex(indexName, tableName, columns, columnsProperties, properties)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateIndexExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateIndexExec.scala index 78bdf64..20ccf99 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateIndexExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateIndexExec.scala @@ -39,13 +39,20 @@ case class CreateIndexExec( properties: Map[String, String]) extends LeafV2CommandExec { override protected def run(): Seq[InternalRow] = { + + val propertiesWithIndexType: Map[String, String] = if (indexType.nonEmpty) { + properties + (SupportsIndex.PROP_TYPE -> indexType) + } else { + properties + } + val colProperties = new util.HashMap[NamedReference, util.Map[String, String]] columns.foreach { case (column, map) => colProperties.put(column, map.asJava) } try { table.createIndex( - indexName, indexType, columns.unzip._1.toArray, colProperties, properties.asJava) + indexName, columns.unzip._1.toArray, colProperties, propertiesWithIndexType.asJava) } catch { case _: IndexAlreadyExistsException if ignoreIfExists => logWarning(s"Index $indexName already exists in table ${table.name}. Ignoring.") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala index 304431d..31c0167 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala @@ -54,7 +54,6 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt override def createIndex( indexName: String, - indexType: String, columns: Array[NamedReference], columnsProperties: util.Map[NamedReference, util.Map[String, String]], properties: util.Map[String, String]): Unit = { @@ -62,7 +61,7 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt JdbcUtils.classifyException(s"Failed to create index $indexName in $name", JdbcDialects.get(jdbcOptions.url)) { JdbcUtils.createIndex( - conn, indexName, indexType, name, columns, columnsProperties, properties, jdbcOptions) + conn, indexName, name, columns, columnsProperties, properties, jdbcOptions) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 568318c..c7db771 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -295,7 +295,6 @@ abstract class JdbcDialect extends Serializable with Logging{ * Build a create index SQL statement. * * @param indexName the name of the index to be created - * @param indexType the type of the index to be created * @param tableName the table on which index to be created * @param columns the columns on which index to be created * @param columnsProperties the properties of the columns on which index to be created @@ -304,7 +303,6 @@ abstract class JdbcDialect extends Serializable with Logging{ */ def createIndex( indexName: String, - indexType: String, tableName: String, columns: Array[NamedReference], columnsProperties: util.Map[NamedReference, util.Map[String, String]], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index 73b36f1..28e15b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -26,7 +26,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException} -import org.apache.spark.sql.connector.catalog.index.TableIndex +import org.apache.spark.sql.connector.catalog.index.{SupportsIndex, TableIndex} import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} @@ -115,30 +115,29 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { // https://dev.mysql.com/doc/refman/8.0/en/create-index.html override def createIndex( indexName: String, - indexType: String, tableName: String, columns: Array[NamedReference], columnsProperties: util.Map[NamedReference, util.Map[String, String]], properties: util.Map[String, String]): String = { val columnList = columns.map(col => quoteIdentifier(col.fieldNames.head)) var indexProperties: String = "" + var indexType = "" if (!properties.isEmpty) { properties.asScala.foreach { case (k, v) => - indexProperties = indexProperties + " " + s"$k $v" - } - } - val iType = if (indexType.isEmpty) { - "" - } else { - if (indexType.length > 1 && !indexType.equalsIgnoreCase("BTREE") && - !indexType.equalsIgnoreCase("HASH")) { - throw new UnsupportedOperationException(s"Index Type $indexType is not supported." + - " The supported Index Types are: BTREE and HASH") + if (k.equals(SupportsIndex.PROP_TYPE)) { + if (v.equalsIgnoreCase("BTREE") || v.equalsIgnoreCase("HASH")) { + indexType = s"USING $v" + } else { + throw new UnsupportedOperationException(s"Index Type $v is not supported." + + " The supported Index Types are: BTREE and HASH") + } + } else { + indexProperties = indexProperties + " " + s"$k $v" + } } - s"USING $indexType" } // columnsProperties doesn't apply to MySQL so it is ignored - s"CREATE INDEX ${quoteIdentifier(indexName)} $iType ON" + + s"CREATE INDEX ${quoteIdentifier(indexName)} $indexType ON" + s" ${quoteIdentifier(tableName)} (${columnList.mkString(", ")}) $indexProperties" } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org