This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2710dbe6abf [SPARK-45139][SQL] Add DatabricksDialect to handle SQL 
type conversion
2710dbe6abf is described below

commit 2710dbe6abf0b143a18268a99635d0e6033bea78
Author: Ivan Sadikov <ivan.sadi...@databricks.com>
AuthorDate: Wed Sep 13 02:28:56 2023 -0700

    [SPARK-45139][SQL] Add DatabricksDialect to handle SQL type conversion
    
    ### What changes were proposed in this pull request?
    
    This PR adds `DatabricksDialect` to Spark to allow users to query 
Databricks clusters and Databricks SQL warehouses with more precise SQL type 
conversion and quote identifiers instead of doing it manually in the code.
    
    ### Why are the changes needed?
    
    The PR fixes type conversion and makes it easier to query Databricks 
clusters.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    I added unit tests in JDBCSuite to check conversion.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #42896 from sadikovi/add_databricks_dialect.
    
    Authored-by: Ivan Sadikov <ivan.sadi...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../apache/spark/sql/jdbc/DatabricksDialect.scala  | 93 ++++++++++++++++++++++
 .../org/apache/spark/sql/jdbc/JdbcDialects.scala   |  1 +
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala      | 26 ++++++
 3 files changed, 120 insertions(+)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala
new file mode 100644
index 00000000000..1b715283dd4
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.jdbc
+
+import java.sql.Connection
+
+import scala.collection.mutable.ArrayBuilder
+
+import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
+import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo
+import org.apache.spark.sql.types._
+
+private case object DatabricksDialect extends JdbcDialect {
+
+  override def canHandle(url: String): Boolean = {
+    url.startsWith("jdbc:databricks")
+  }
+
+  override def getCatalystType(
+      sqlType: Int,
+      typeName: String,
+      size: Int,
+      md: MetadataBuilder): Option[DataType] = {
+    sqlType match {
+      case java.sql.Types.TINYINT => Some(ByteType)
+      case java.sql.Types.SMALLINT => Some(ShortType)
+      case java.sql.Types.REAL => Some(FloatType)
+      case _ => None
+    }
+  }
+
+  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
+    case BooleanType => Some(JdbcType("BOOLEAN", java.sql.Types.BOOLEAN))
+    case DoubleType => Some(JdbcType("DOUBLE", java.sql.Types.DOUBLE))
+    case StringType => Some(JdbcType("STRING", java.sql.Types.VARCHAR))
+    case BinaryType => Some(JdbcType("BINARY", java.sql.Types.BINARY))
+    case _ => None
+  }
+
+  override def quoteIdentifier(colName: String): String = {
+    s"`$colName`"
+  }
+
+  override def supportsLimit: Boolean = true
+
+  override def supportsOffset: Boolean = true
+
+  override def supportsTableSample: Boolean = true
+
+  override def getTableSample(sample: TableSampleInfo): String = {
+    s"TABLESAMPLE (${(sample.upperBound - sample.lowerBound) * 100}) 
REPEATABLE (${sample.seed})"
+  }
+
+  // Override listSchemas to run "show schemas" as a PreparedStatement instead 
of
+  // invoking getMetaData.getSchemas as it may not work correctly in older 
versions of the driver.
+  override def schemasExists(conn: Connection, options: JDBCOptions, schema: 
String): Boolean = {
+    val stmt = conn.prepareStatement("SHOW SCHEMAS")
+    val rs = stmt.executeQuery()
+    while (rs.next()) {
+      if (rs.getString(1) == schema) {
+        return true
+      }
+    }
+    false
+  }
+
+  // Override listSchemas to run "show schemas" as a PreparedStatement instead 
of
+  // invoking getMetaData.getSchemas as it may not work correctly in older 
versions of the driver.
+  override def listSchemas(conn: Connection, options: JDBCOptions): 
Array[Array[String]] = {
+    val schemaBuilder = ArrayBuilder.make[Array[String]]
+    val stmt = conn.prepareStatement("SHOW SCHEMAS")
+    val rs = stmt.executeQuery()
+    while (rs.next()) {
+      schemaBuilder += Array(rs.getString(1))
+    }
+    schemaBuilder.result
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index d775f5fce2f..c9e47e866c0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -703,6 +703,7 @@ object JdbcDialects {
   registerDialect(TeradataDialect)
   registerDialect(H2Dialect)
   registerDialect(SnowflakeDialect)
+  registerDialect(DatabricksDialect)
 
   /**
    * Fetch the JdbcDialect class corresponding to a given database url.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 6f60e0f7394..1b7a92b7199 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -2062,4 +2062,30 @@ class JDBCSuite extends QueryTest with 
SharedSparkSession {
     val snowflakeDialect = 
JdbcDialects.get("jdbc:snowflake://account.snowflakecomputing.com")
     
assert(snowflakeDialect.getJDBCType(BooleanType).map(_.databaseTypeDefinition).get
 == "BOOLEAN")
   }
+
+  test("SPARK-45139: DatabricksDialect url handling") {
+    assert(JdbcDialects.get("jdbc:databricks://account.cloud.databricks.com") 
== DatabricksDialect)
+  }
+
+  test("SPARK-45139: DatabricksDialect catalyst type mapping") {
+    val databricksDialect = 
JdbcDialects.get("jdbc:databricks://account.cloud.databricks.com")
+    assert(databricksDialect
+      .getCatalystType(java.sql.Types.TINYINT, "", 1, null) == Some(ByteType))
+    assert(databricksDialect
+      .getCatalystType(java.sql.Types.SMALLINT, "", 1, null) == 
Some(ShortType))
+    assert(databricksDialect
+      .getCatalystType(java.sql.Types.REAL, "", 1, null) == Some(FloatType))
+  }
+
+  test("SPARK-45139: DatabricksDialect JDBC type mapping") {
+    val databricksDialect = 
JdbcDialects.get("jdbc:databricks://account.cloud.databricks.com")
+    assert(databricksDialect
+      .getJDBCType(BooleanType).map(_.databaseTypeDefinition).get == "BOOLEAN")
+    assert(databricksDialect
+      .getJDBCType(DoubleType).map(_.databaseTypeDefinition).get == "DOUBLE")
+    assert(databricksDialect
+      .getJDBCType(StringType).map(_.databaseTypeDefinition).get == "STRING")
+    assert(databricksDialect
+      .getJDBCType(BinaryType).map(_.databaseTypeDefinition).get == "BINARY")
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to