This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 2710dbe6abf [SPARK-45139][SQL] Add DatabricksDialect to handle SQL type conversion 2710dbe6abf is described below commit 2710dbe6abf0b143a18268a99635d0e6033bea78 Author: Ivan Sadikov <ivan.sadi...@databricks.com> AuthorDate: Wed Sep 13 02:28:56 2023 -0700 [SPARK-45139][SQL] Add DatabricksDialect to handle SQL type conversion ### What changes were proposed in this pull request? This PR adds `DatabricksDialect` to Spark to allow users to query Databricks clusters and Databricks SQL warehouses with more precise SQL type conversion and quote identifiers instead of doing it manually in the code. ### Why are the changes needed? The PR fixes type conversion and makes it easier to query Databricks clusters. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I added unit tests in JDBCSuite to check conversion. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42896 from sadikovi/add_databricks_dialect. Authored-by: Ivan Sadikov <ivan.sadi...@databricks.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../apache/spark/sql/jdbc/DatabricksDialect.scala | 93 ++++++++++++++++++++++ .../org/apache/spark/sql/jdbc/JdbcDialects.scala | 1 + .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 26 ++++++ 3 files changed, 120 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala new file mode 100644 index 00000000000..1b715283dd4 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.jdbc + +import java.sql.Connection + +import scala.collection.mutable.ArrayBuilder + +import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions +import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo +import org.apache.spark.sql.types._ + +private case object DatabricksDialect extends JdbcDialect { + + override def canHandle(url: String): Boolean = { + url.startsWith("jdbc:databricks") + } + + override def getCatalystType( + sqlType: Int, + typeName: String, + size: Int, + md: MetadataBuilder): Option[DataType] = { + sqlType match { + case java.sql.Types.TINYINT => Some(ByteType) + case java.sql.Types.SMALLINT => Some(ShortType) + case java.sql.Types.REAL => Some(FloatType) + case _ => None + } + } + + override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { + case BooleanType => Some(JdbcType("BOOLEAN", java.sql.Types.BOOLEAN)) + case DoubleType => Some(JdbcType("DOUBLE", java.sql.Types.DOUBLE)) + case StringType => Some(JdbcType("STRING", java.sql.Types.VARCHAR)) + case BinaryType => Some(JdbcType("BINARY", java.sql.Types.BINARY)) + case _ => None + } + + override def quoteIdentifier(colName: String): String = { + s"`$colName`" + } + + override def supportsLimit: Boolean = true + + override def supportsOffset: Boolean = true + + override def supportsTableSample: Boolean = true + + override def getTableSample(sample: TableSampleInfo): String = { + s"TABLESAMPLE (${(sample.upperBound - sample.lowerBound) * 100}) REPEATABLE (${sample.seed})" + } + + // Override listSchemas to run "show schemas" as a PreparedStatement instead of + // invoking getMetaData.getSchemas as it may not work correctly in older versions of the driver. + override def schemasExists(conn: Connection, options: JDBCOptions, schema: String): Boolean = { + val stmt = conn.prepareStatement("SHOW SCHEMAS") + val rs = stmt.executeQuery() + while (rs.next()) { + if (rs.getString(1) == schema) { + return true + } + } + false + } + + // Override listSchemas to run "show schemas" as a PreparedStatement instead of + // invoking getMetaData.getSchemas as it may not work correctly in older versions of the driver. + override def listSchemas(conn: Connection, options: JDBCOptions): Array[Array[String]] = { + val schemaBuilder = ArrayBuilder.make[Array[String]] + val stmt = conn.prepareStatement("SHOW SCHEMAS") + val rs = stmt.executeQuery() + while (rs.next()) { + schemaBuilder += Array(rs.getString(1)) + } + schemaBuilder.result + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index d775f5fce2f..c9e47e866c0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -703,6 +703,7 @@ object JdbcDialects { registerDialect(TeradataDialect) registerDialect(H2Dialect) registerDialect(SnowflakeDialect) + registerDialect(DatabricksDialect) /** * Fetch the JdbcDialect class corresponding to a given database url. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 6f60e0f7394..1b7a92b7199 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -2062,4 +2062,30 @@ class JDBCSuite extends QueryTest with SharedSparkSession { val snowflakeDialect = JdbcDialects.get("jdbc:snowflake://account.snowflakecomputing.com") assert(snowflakeDialect.getJDBCType(BooleanType).map(_.databaseTypeDefinition).get == "BOOLEAN") } + + test("SPARK-45139: DatabricksDialect url handling") { + assert(JdbcDialects.get("jdbc:databricks://account.cloud.databricks.com") == DatabricksDialect) + } + + test("SPARK-45139: DatabricksDialect catalyst type mapping") { + val databricksDialect = JdbcDialects.get("jdbc:databricks://account.cloud.databricks.com") + assert(databricksDialect + .getCatalystType(java.sql.Types.TINYINT, "", 1, null) == Some(ByteType)) + assert(databricksDialect + .getCatalystType(java.sql.Types.SMALLINT, "", 1, null) == Some(ShortType)) + assert(databricksDialect + .getCatalystType(java.sql.Types.REAL, "", 1, null) == Some(FloatType)) + } + + test("SPARK-45139: DatabricksDialect JDBC type mapping") { + val databricksDialect = JdbcDialects.get("jdbc:databricks://account.cloud.databricks.com") + assert(databricksDialect + .getJDBCType(BooleanType).map(_.databaseTypeDefinition).get == "BOOLEAN") + assert(databricksDialect + .getJDBCType(DoubleType).map(_.databaseTypeDefinition).get == "DOUBLE") + assert(databricksDialect + .getJDBCType(StringType).map(_.databaseTypeDefinition).get == "STRING") + assert(databricksDialect + .getJDBCType(BinaryType).map(_.databaseTypeDefinition).get == "BINARY") + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org