This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 47d783bc6489 [SPARK-47882][SQL] createTableColumnTypes need to be mapped to database types instead of using directly 47d783bc6489 is described below commit 47d783bc64897c85294a32d5ea2ca0ec8a655ea7 Author: Kent Yao <y...@apache.org> AuthorDate: Wed Apr 17 20:34:16 2024 -0700 [SPARK-47882][SQL] createTableColumnTypes need to be mapped to database types instead of using directly ### What changes were proposed in this pull request? createTableColumnTypes contains Spark SQL data type definitions. The underlying database might not recognize them, boolean for Oracle(v < 23c). ### Why are the changes needed? bugfix ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new test ### Was this patch authored or co-authored using generative AI tooling? no Closes #46093 from yaooqinn/SPARK-47882. Authored-by: Kent Yao <y...@apache.org> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../spark/sql/execution/datasources/jdbc/JdbcUtils.scala | 14 ++++++++------ .../test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 12 ++++++++++-- .../scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala | 8 +++++--- 3 files changed, 23 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index fd7be9d0ea41..c541ec16fc82 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -878,16 +878,15 @@ object JdbcUtils extends Logging with SQLConfHelper { * Compute the schema string for this RDD. */ def schemaString( + dialect: JdbcDialect, schema: StructType, caseSensitive: Boolean, - url: String, createTableColumnTypes: Option[String] = None): String = { val sb = new StringBuilder() - val dialect = JdbcDialects.get(url) val userSpecifiedColTypesMap = createTableColumnTypes - .map(parseUserSpecifiedCreateTableColumnTypes(schema, caseSensitive, _)) + .map(parseUserSpecifiedCreateTableColumnTypes(dialect, schema, caseSensitive, _)) .getOrElse(Map.empty[String, String]) - schema.fields.foreach { field => + schema.foreach { field => val name = dialect.quoteIdentifier(field.name) val typ = userSpecifiedColTypesMap .getOrElse(field.name, getJdbcType(field.dataType, dialect).databaseTypeDefinition) @@ -903,6 +902,7 @@ object JdbcUtils extends Logging with SQLConfHelper { * use in-place of the default data type. */ private def parseUserSpecifiedCreateTableColumnTypes( + dialect: JdbcDialect, schema: StructType, caseSensitive: Boolean, createTableColumnTypes: String): Map[String, String] = { @@ -919,7 +919,9 @@ object JdbcUtils extends Logging with SQLConfHelper { } } - val userSchemaMap = userSchema.fields.map(f => f.name -> f.dataType.catalogString).toMap + val userSchemaMap = userSchema + .map(f => f.name -> getJdbcType(f.dataType, dialect).databaseTypeDefinition) + .toMap if (caseSensitive) userSchemaMap else CaseInsensitiveMap(userSchemaMap) } @@ -988,7 +990,7 @@ object JdbcUtils extends Logging with SQLConfHelper { val statement = conn.createStatement val dialect = JdbcDialects.get(options.url) val strSchema = schemaString( - schema, caseSensitive, options.url, options.createTableColumnTypes) + dialect, schema, caseSensitive, options.createTableColumnTypes) try { statement.setQueryTimeout(options.queryTimeout) dialect.createTable(statement, tableName, strSchema, options) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 5915a44b7954..34c554f7d37e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -1372,9 +1372,9 @@ class JDBCSuite extends QueryTest with SharedSparkSession { test("SPARK-16387: Reserved SQL words are not escaped by JDBC writer") { val df = spark.createDataset(Seq("a", "b", "c")).toDF("order") val schema = JdbcUtils.schemaString( + JdbcDialects.get("jdbc:mysql://localhost:3306/temp"), df.schema, - df.sparkSession.sessionState.conf.caseSensitiveAnalysis, - "jdbc:mysql://localhost:3306/temp") + df.sparkSession.sessionState.conf.caseSensitiveAnalysis) assert(schema.contains("`order` LONGTEXT")) } @@ -2182,4 +2182,12 @@ class JDBCSuite extends QueryTest with SharedSparkSession { dialect = JdbcDialects.get("jdbc:dummy:dummy_host:dummy_port/dummy_db") assert(dialect === NoopDialect) } + + test("SPARK-47882: createTableColumnTypes need to be mapped to database types") { + val dialect = JdbcDialects.get("jdbc:oracle:dummy_host:dummy_port/dummy_db") + val schema = new StructType().add("b", "boolean") + val schemaStr = + JdbcUtils.schemaString(dialect, schema, caseSensitive = false, Some("b boolean")) + assert(schemaStr === """"b" NUMBER(1) """) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index 0d9dc2f76faf..76a092b552f9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -406,19 +406,21 @@ class JDBCWriteSuite extends SharedSparkSession with BeforeAndAfter { test("SPARK-10849: test schemaString - from createTableColumnTypes option values") { def testCreateTableColDataTypes(types: Seq[String]): Unit = { + val dialect = JdbcDialects.get(url1) val colTypes = types.zipWithIndex.map { case (t, i) => (s"col$i", t) } val schema = colTypes .foldLeft(new StructType())((schema, colType) => schema.add(colType._1, colType._2)) val createTableColTypes = colTypes.map { case (col, dataType) => s"$col $dataType" }.mkString(", ") - val expectedSchemaStr = - colTypes.map { case (col, dataType) => s""""$col" $dataType """ }.mkString(", ") + val expectedSchemaStr = schema.map { f => + s""""${f.name}" ${JdbcUtils.getJdbcType(f.dataType, dialect).databaseTypeDefinition} """ + }.mkString(", ") assert(JdbcUtils.schemaString( + dialect, schema, spark.sessionState.conf.caseSensitiveAnalysis, - url1, Option(createTableColTypes)) == expectedSchemaStr) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org