This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 47d783bc6489 [SPARK-47882][SQL] createTableColumnTypes need to be 
mapped to database types instead of using directly
47d783bc6489 is described below

commit 47d783bc64897c85294a32d5ea2ca0ec8a655ea7
Author: Kent Yao <y...@apache.org>
AuthorDate: Wed Apr 17 20:34:16 2024 -0700

    [SPARK-47882][SQL] createTableColumnTypes need to be mapped to database 
types instead of using directly
    
    ### What changes were proposed in this pull request?
    
    createTableColumnTypes contains Spark SQL data type definitions. The 
underlying database might not recognize them, boolean for Oracle(v < 23c).
    
    ### Why are the changes needed?
    
    bugfix
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    new test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    no
    
    Closes #46093 from yaooqinn/SPARK-47882.
    
    Authored-by: Kent Yao <y...@apache.org>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../spark/sql/execution/datasources/jdbc/JdbcUtils.scala   | 14 ++++++++------
 .../test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala   | 12 ++++++++++--
 .../scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala   |  8 +++++---
 3 files changed, 23 insertions(+), 11 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index fd7be9d0ea41..c541ec16fc82 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -878,16 +878,15 @@ object JdbcUtils extends Logging with SQLConfHelper {
    * Compute the schema string for this RDD.
    */
   def schemaString(
+      dialect: JdbcDialect,
       schema: StructType,
       caseSensitive: Boolean,
-      url: String,
       createTableColumnTypes: Option[String] = None): String = {
     val sb = new StringBuilder()
-    val dialect = JdbcDialects.get(url)
     val userSpecifiedColTypesMap = createTableColumnTypes
-      .map(parseUserSpecifiedCreateTableColumnTypes(schema, caseSensitive, _))
+      .map(parseUserSpecifiedCreateTableColumnTypes(dialect, schema, 
caseSensitive, _))
       .getOrElse(Map.empty[String, String])
-    schema.fields.foreach { field =>
+    schema.foreach { field =>
       val name = dialect.quoteIdentifier(field.name)
       val typ = userSpecifiedColTypesMap
         .getOrElse(field.name, getJdbcType(field.dataType, 
dialect).databaseTypeDefinition)
@@ -903,6 +902,7 @@ object JdbcUtils extends Logging with SQLConfHelper {
    * use in-place of the default data type.
    */
   private def parseUserSpecifiedCreateTableColumnTypes(
+      dialect: JdbcDialect,
       schema: StructType,
       caseSensitive: Boolean,
       createTableColumnTypes: String): Map[String, String] = {
@@ -919,7 +919,9 @@ object JdbcUtils extends Logging with SQLConfHelper {
       }
     }
 
-    val userSchemaMap = userSchema.fields.map(f => f.name -> 
f.dataType.catalogString).toMap
+    val userSchemaMap = userSchema
+      .map(f => f.name -> getJdbcType(f.dataType, 
dialect).databaseTypeDefinition)
+      .toMap
     if (caseSensitive) userSchemaMap else CaseInsensitiveMap(userSchemaMap)
   }
 
@@ -988,7 +990,7 @@ object JdbcUtils extends Logging with SQLConfHelper {
     val statement = conn.createStatement
     val dialect = JdbcDialects.get(options.url)
     val strSchema = schemaString(
-      schema, caseSensitive, options.url, options.createTableColumnTypes)
+      dialect, schema, caseSensitive, options.createTableColumnTypes)
     try {
       statement.setQueryTimeout(options.queryTimeout)
       dialect.createTable(statement, tableName, strSchema, options)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 5915a44b7954..34c554f7d37e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -1372,9 +1372,9 @@ class JDBCSuite extends QueryTest with SharedSparkSession 
{
   test("SPARK-16387: Reserved SQL words are not escaped by JDBC writer") {
     val df = spark.createDataset(Seq("a", "b", "c")).toDF("order")
     val schema = JdbcUtils.schemaString(
+      JdbcDialects.get("jdbc:mysql://localhost:3306/temp"),
       df.schema,
-      df.sparkSession.sessionState.conf.caseSensitiveAnalysis,
-      "jdbc:mysql://localhost:3306/temp")
+      df.sparkSession.sessionState.conf.caseSensitiveAnalysis)
     assert(schema.contains("`order` LONGTEXT"))
   }
 
@@ -2182,4 +2182,12 @@ class JDBCSuite extends QueryTest with 
SharedSparkSession {
     dialect = JdbcDialects.get("jdbc:dummy:dummy_host:dummy_port/dummy_db")
     assert(dialect === NoopDialect)
   }
+
+  test("SPARK-47882: createTableColumnTypes need to be mapped to database 
types") {
+    val dialect = 
JdbcDialects.get("jdbc:oracle:dummy_host:dummy_port/dummy_db")
+    val schema = new StructType().add("b", "boolean")
+    val schemaStr =
+      JdbcUtils.schemaString(dialect, schema, caseSensitive = false, Some("b 
boolean"))
+    assert(schemaStr === """"b" NUMBER(1) """)
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
index 0d9dc2f76faf..76a092b552f9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
@@ -406,19 +406,21 @@ class JDBCWriteSuite extends SharedSparkSession with 
BeforeAndAfter {
 
   test("SPARK-10849: test schemaString - from createTableColumnTypes option 
values") {
     def testCreateTableColDataTypes(types: Seq[String]): Unit = {
+      val dialect = JdbcDialects.get(url1)
       val colTypes = types.zipWithIndex.map { case (t, i) => (s"col$i", t) }
       val schema = colTypes
         .foldLeft(new StructType())((schema, colType) => 
schema.add(colType._1, colType._2))
       val createTableColTypes =
         colTypes.map { case (col, dataType) => s"$col $dataType" }.mkString(", 
")
 
-      val expectedSchemaStr =
-        colTypes.map { case (col, dataType) => s""""$col" $dataType """ 
}.mkString(", ")
+      val expectedSchemaStr = schema.map { f =>
+          s""""${f.name}" ${JdbcUtils.getJdbcType(f.dataType, 
dialect).databaseTypeDefinition} """
+        }.mkString(", ")
 
       assert(JdbcUtils.schemaString(
+        dialect,
         schema,
         spark.sessionState.conf.caseSensitiveAnalysis,
-        url1,
         Option(createTableColTypes)) == expectedSchemaStr)
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to