This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 3857c16be81f [SPARK-47435][SPARK-45561][SQL][3.5] Fix overflow issue 
of MySQL UNSIGNED TINYINT caused by
3857c16be81f is described below

commit 3857c16be81f568cc5caf81f65941109bf5f2939
Author: Kent Yao <y...@apache.org>
AuthorDate: Tue Mar 19 13:37:28 2024 +0800

    [SPARK-47435][SPARK-45561][SQL][3.5] Fix overflow issue of MySQL UNSIGNED 
TINYINT caused by
    
    ### What changes were proposed in this pull request?
    
    SPARK-45561 mapped java.sql.Types.TINYINT to ByteType in MySQL Dialect, 
which caused unsigned TINYINT overflow. As regardless of signed or unsigned 
types, the TINYINT is used for java.sql.Types.
    In this PR, we put the signed info into the metadata for mapping TINYINT to 
short or byte.
    
    ### Why are the changes needed?
    
    bugfix
    
    ### Does this PR introduce _any_ user-facing change?
    
    Uses can read MySQL UNSIGNED TINYINT values after this PR like versions 
before 3.5.0 which has breaked since 3.5.1
    
    ### How was this patch tested?
    
    new tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    no
    
    Closes #45579 from yaooqinn/SPARK-47435-B.
    
    Authored-by: Kent Yao <y...@apache.org>
    Signed-off-by: Kent Yao <y...@apache.org>
---
 .../spark/sql/jdbc/MySQLIntegrationSuite.scala     |  9 ++--
 .../spark/sql/jdbc/v2/DB2IntegrationSuite.scala    |  9 ++--
 .../sql/jdbc/v2/MsSqlServerIntegrationSuite.scala  |  6 ++-
 .../spark/sql/jdbc/v2/MySQLIntegrationSuite.scala  | 14 ++++--
 .../spark/sql/jdbc/v2/OracleIntegrationSuite.scala |  9 ++--
 .../sql/jdbc/v2/PostgresIntegrationSuite.scala     |  9 ++--
 .../org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala  | 28 +++++++----
 .../sql/execution/datasources/jdbc/JdbcUtils.scala |  6 +--
 .../org/apache/spark/sql/jdbc/MySQLDialect.scala   | 10 ++--
 .../v2/jdbc/JDBCTableCatalogSuite.scala            | 54 +++++++++++++---------
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala      | 26 +++++++----
 11 files changed, 115 insertions(+), 65 deletions(-)

diff --git 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
index 20fdc965874f..dcf4225d522d 100644
--- 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
+++ 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
@@ -56,10 +56,11 @@ class MySQLIntegrationSuite extends 
DockerJDBCIntegrationSuite {
 
     conn.prepareStatement("CREATE TABLE numbers (onebit BIT(1), tenbits 
BIT(10), "
       + "small SMALLINT, med MEDIUMINT, nor INT, big BIGINT, deci 
DECIMAL(40,20), flt FLOAT, "
-      + "dbl DOUBLE, tiny TINYINT)").executeUpdate()
+      + "dbl DOUBLE, tiny TINYINT, u_tiny TINYINT UNSIGNED)").executeUpdate()
+
     conn.prepareStatement("INSERT INTO numbers VALUES (b'0', b'1000100101', "
       + "17, 77777, 123456789, 123456789012345, 
123456789012345.123456789012345, "
-      + "42.75, 1.0000000000000002, -128)").executeUpdate()
+      + "42.75, 1.0000000000000002, -128, 255)").executeUpdate()
 
     conn.prepareStatement("CREATE TABLE dates (d DATE, t TIME, dt DATETIME, ts 
TIMESTAMP, "
       + "yr YEAR)").executeUpdate()
@@ -89,7 +90,7 @@ class MySQLIntegrationSuite extends 
DockerJDBCIntegrationSuite {
     val rows = df.collect()
     assert(rows.length == 1)
     val types = rows(0).toSeq.map(x => x.getClass.toString)
-    assert(types.length == 10)
+    assert(types.length == 11)
     assert(types(0).equals("class java.lang.Boolean"))
     assert(types(1).equals("class java.lang.Long"))
     assert(types(2).equals("class java.lang.Integer"))
@@ -100,6 +101,7 @@ class MySQLIntegrationSuite extends 
DockerJDBCIntegrationSuite {
     assert(types(7).equals("class java.lang.Double"))
     assert(types(8).equals("class java.lang.Double"))
     assert(types(9).equals("class java.lang.Byte"))
+    assert(types(10).equals("class java.lang.Short"))
     assert(rows(0).getBoolean(0) == false)
     assert(rows(0).getLong(1) == 0x225)
     assert(rows(0).getInt(2) == 17)
@@ -111,6 +113,7 @@ class MySQLIntegrationSuite extends 
DockerJDBCIntegrationSuite {
     assert(rows(0).getDouble(7) == 42.75)
     assert(rows(0).getDouble(8) == 1.0000000000000002)
     assert(rows(0).getByte(9) == 0x80.toByte)
+    assert(rows(0).getShort(10) == 0xff.toShort)
   }
 
   test("Date types") {
diff --git 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala
 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala
index 661b1277e9f0..9a78244f5326 100644
--- 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala
+++ 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala
@@ -85,11 +85,13 @@ class DB2IntegrationSuite extends 
DockerJDBCIntegrationV2Suite with V2JDBCTest {
   override def testUpdateColumnType(tbl: String): Unit = {
     sql(s"CREATE TABLE $tbl (ID INTEGER)")
     var t = spark.table(tbl)
-    var expectedSchema = new StructType().add("ID", IntegerType, true, 
defaultMetadata)
+    var expectedSchema = new StructType()
+      .add("ID", IntegerType, true, defaultMetadata(IntegerType))
     assert(t.schema === expectedSchema)
     sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE DOUBLE")
     t = spark.table(tbl)
-    expectedSchema = new StructType().add("ID", DoubleType, true, 
defaultMetadata)
+    expectedSchema = new StructType()
+      .add("ID", DoubleType, true, defaultMetadata(DoubleType))
     assert(t.schema === expectedSchema)
     // Update column type from DOUBLE to STRING
     val sql1 = s"ALTER TABLE $tbl ALTER COLUMN id TYPE VARCHAR(10)"
@@ -112,7 +114,8 @@ class DB2IntegrationSuite extends 
DockerJDBCIntegrationV2Suite with V2JDBCTest {
     sql(s"CREATE TABLE $tbl (ID INT)" +
       s" TBLPROPERTIES('CCSID'='UNICODE')")
     val t = spark.table(tbl)
-    val expectedSchema = new StructType().add("ID", IntegerType, true, 
defaultMetadata)
+    val expectedSchema = new StructType()
+      .add("ID", IntegerType, true, defaultMetadata(IntegerType))
     assert(t.schema === expectedSchema)
   }
 
diff --git 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala
 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala
index fc93f5cba4c0..e451cc2b8c52 100644
--- 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala
+++ 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala
@@ -93,11 +93,13 @@ class MsSqlServerIntegrationSuite extends 
DockerJDBCIntegrationV2Suite with V2JD
   override def testUpdateColumnType(tbl: String): Unit = {
     sql(s"CREATE TABLE $tbl (ID INTEGER)")
     var t = spark.table(tbl)
-    var expectedSchema = new StructType().add("ID", IntegerType, true, 
defaultMetadata)
+    var expectedSchema = new StructType()
+      .add("ID", IntegerType, true, defaultMetadata(IntegerType))
     assert(t.schema === expectedSchema)
     sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE STRING")
     t = spark.table(tbl)
-    expectedSchema = new StructType().add("ID", StringType, true, 
defaultMetadata)
+    expectedSchema = new StructType()
+      .add("ID", StringType, true, defaultMetadata())
     assert(t.schema === expectedSchema)
     // Update column type from STRING to INTEGER
     val sql1 = s"ALTER TABLE $tbl ALTER COLUMN id TYPE INTEGER"
diff --git 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
index 5e340f135c85..719b858b87b6 100644
--- 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
+++ 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
@@ -83,6 +83,11 @@ class MySQLIntegrationSuite extends 
DockerJDBCIntegrationV2Suite with V2JDBCTest
 
   private var mySQLVersion = -1
 
+  override def defaultMetadata(dataType: DataType = StringType): Metadata = 
new MetadataBuilder()
+    .putLong("scale", 0)
+    .putBoolean("isSigned", true)
+    .build()
+
   override def tablePreparation(connection: Connection): Unit = {
     mySQLVersion = connection.getMetaData.getDatabaseMajorVersion
     connection.prepareStatement(
@@ -93,11 +98,13 @@ class MySQLIntegrationSuite extends 
DockerJDBCIntegrationV2Suite with V2JDBCTest
   override def testUpdateColumnType(tbl: String): Unit = {
     sql(s"CREATE TABLE $tbl (ID INTEGER)")
     var t = spark.table(tbl)
-    var expectedSchema = new StructType().add("ID", IntegerType, true, 
defaultMetadata)
+    var expectedSchema = new StructType()
+      .add("ID", IntegerType, true, defaultMetadata(IntegerType))
     assert(t.schema === expectedSchema)
     sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE STRING")
     t = spark.table(tbl)
-    expectedSchema = new StructType().add("ID", StringType, true, 
defaultMetadata)
+    expectedSchema = new StructType()
+      .add("ID", StringType, true, defaultMetadata())
     assert(t.schema === expectedSchema)
     // Update column type from STRING to INTEGER
     val sql1 = s"ALTER TABLE $tbl ALTER COLUMN id TYPE INTEGER"
@@ -145,7 +152,8 @@ class MySQLIntegrationSuite extends 
DockerJDBCIntegrationV2Suite with V2JDBCTest
     sql(s"CREATE TABLE $tbl (ID INT)" +
       s" TBLPROPERTIES('ENGINE'='InnoDB', 'DEFAULT CHARACTER SET'='utf8')")
     val t = spark.table(tbl)
-    val expectedSchema = new StructType().add("ID", IntegerType, true, 
defaultMetadata)
+    val expectedSchema = new StructType()
+      .add("ID", IntegerType, true, defaultMetadata(IntegerType))
     assert(t.schema === expectedSchema)
   }
 
diff --git 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala
 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala
index 6b5dd043a617..33ce55b1c6c9 100644
--- 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala
+++ 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala
@@ -87,8 +87,9 @@ class OracleIntegrationSuite extends 
DockerJDBCIntegrationV2Suite with V2JDBCTes
       s"jdbc:oracle:thin:system/$oracle_password@//$ip:$port/xe"
   }
 
-  override val defaultMetadata: Metadata = new MetadataBuilder()
+  override def defaultMetadata(dataType: DataType): Metadata = new 
MetadataBuilder()
     .putLong("scale", 0)
+    .putBoolean("isSigned", dataType.isInstanceOf[NumericType] || 
dataType.isInstanceOf[StringType])
     .putString(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY, "varchar(255)")
     .build()
 
@@ -110,11 +111,13 @@ class OracleIntegrationSuite extends 
DockerJDBCIntegrationV2Suite with V2JDBCTes
   override def testUpdateColumnType(tbl: String): Unit = {
     sql(s"CREATE TABLE $tbl (ID INTEGER)")
     var t = spark.table(tbl)
-    var expectedSchema = new StructType().add("ID", DecimalType(10, 0), true, 
super.defaultMetadata)
+    var expectedSchema = new StructType()
+      .add("ID", DecimalType(10, 0), true, 
super.defaultMetadata(DecimalType(10, 0)))
     assert(t.schema === expectedSchema)
     sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE LONG")
     t = spark.table(tbl)
-    expectedSchema = new StructType().add("ID", DecimalType(19, 0), true, 
super.defaultMetadata)
+    expectedSchema = new StructType()
+      .add("ID", DecimalType(19, 0), true, 
super.defaultMetadata(DecimalType(19, 0)))
     assert(t.schema === expectedSchema)
     // Update column type from LONG to INTEGER
     val sql1 = s"ALTER TABLE $tbl ALTER COLUMN id TYPE INTEGER"
diff --git 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala
 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala
index 85e85f8bf380..d2a18ff96b44 100644
--- 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala
+++ 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala
@@ -64,11 +64,13 @@ class PostgresIntegrationSuite extends 
DockerJDBCIntegrationV2Suite with V2JDBCT
   override def testUpdateColumnType(tbl: String): Unit = {
     sql(s"CREATE TABLE $tbl (ID INTEGER)")
     var t = spark.table(tbl)
-    var expectedSchema = new StructType().add("ID", IntegerType, true, 
defaultMetadata)
+    var expectedSchema = new StructType()
+      .add("ID", IntegerType, true, defaultMetadata(IntegerType))
     assert(t.schema === expectedSchema)
     sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE STRING")
     t = spark.table(tbl)
-    expectedSchema = new StructType().add("ID", StringType, true, 
defaultMetadata)
+    expectedSchema = new StructType()
+      .add("ID", StringType, true, defaultMetadata())
     assert(t.schema === expectedSchema)
     // Update column type from STRING to INTEGER
     val sql1 = s"ALTER TABLE $tbl ALTER COLUMN id TYPE INTEGER"
@@ -91,7 +93,8 @@ class PostgresIntegrationSuite extends 
DockerJDBCIntegrationV2Suite with V2JDBCT
     sql(s"CREATE TABLE $tbl (ID INT)" +
       s" TBLPROPERTIES('TABLESPACE'='pg_default')")
     val t = spark.table(tbl)
-    val expectedSchema = new StructType().add("ID", IntegerType, true, 
defaultMetadata)
+    val expectedSchema = new StructType()
+      .add("ID", IntegerType, true, defaultMetadata(IntegerType))
     assert(t.schema === expectedSchema)
   }
 
diff --git 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
index 99f435611f2c..b8671455ac6f 100644
--- 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
+++ 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
@@ -49,18 +49,21 @@ private[v2] trait V2JDBCTest extends SharedSparkSession 
with DockerIntegrationFu
 
   def notSupportsTableComment: Boolean = false
 
-  def defaultMetadata: Metadata = new MetadataBuilder().putLong("scale", 
0).build()
+  def defaultMetadata(dataType: DataType = StringType): Metadata = new 
MetadataBuilder()
+    .putLong("scale", 0)
+    .putBoolean("isSigned", dataType.isInstanceOf[NumericType])
+    .build()
 
   def testUpdateColumnNullability(tbl: String): Unit = {
     sql(s"CREATE TABLE $catalogName.alt_table (ID STRING NOT NULL)")
     var t = spark.table(s"$catalogName.alt_table")
     // nullable is true in the expectedSchema because Spark always sets 
nullable to true
     // regardless of the JDBC metadata 
https://github.com/apache/spark/pull/18445
-    var expectedSchema = new StructType().add("ID", StringType, true, 
defaultMetadata)
+    var expectedSchema = new StructType().add("ID", StringType, true, 
defaultMetadata())
     assert(t.schema === expectedSchema)
     sql(s"ALTER TABLE $catalogName.alt_table ALTER COLUMN ID DROP NOT NULL")
     t = spark.table(s"$catalogName.alt_table")
-    expectedSchema = new StructType().add("ID", StringType, true, 
defaultMetadata)
+    expectedSchema = new StructType().add("ID", StringType, true, 
defaultMetadata())
     assert(t.schema === expectedSchema)
     // Update nullability of not existing column
     val msg = intercept[AnalysisException] {
@@ -72,8 +75,9 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with 
DockerIntegrationFu
   def testRenameColumn(tbl: String): Unit = {
     sql(s"ALTER TABLE $tbl RENAME COLUMN ID TO RENAMED")
     val t = spark.table(s"$tbl")
-    val expectedSchema = new StructType().add("RENAMED", StringType, true, 
defaultMetadata)
-      .add("ID1", StringType, true, defaultMetadata).add("ID2", StringType, 
true, defaultMetadata)
+    val expectedSchema = new StructType().add("RENAMED", StringType, true, 
defaultMetadata())
+      .add("ID1", StringType, true, defaultMetadata())
+      .add("ID2", StringType, true, defaultMetadata())
     assert(t.schema === expectedSchema)
   }
 
@@ -83,16 +87,19 @@ private[v2] trait V2JDBCTest extends SharedSparkSession 
with DockerIntegrationFu
     withTable(s"$catalogName.alt_table") {
       sql(s"CREATE TABLE $catalogName.alt_table (ID STRING)")
       var t = spark.table(s"$catalogName.alt_table")
-      var expectedSchema = new StructType().add("ID", StringType, true, 
defaultMetadata)
+      var expectedSchema = new StructType()
+        .add("ID", StringType, true, defaultMetadata())
       assert(t.schema === expectedSchema)
       sql(s"ALTER TABLE $catalogName.alt_table ADD COLUMNS (C1 STRING, C2 
STRING)")
       t = spark.table(s"$catalogName.alt_table")
-      expectedSchema = expectedSchema.add("C1", StringType, true, 
defaultMetadata)
-        .add("C2", StringType, true, defaultMetadata)
+      expectedSchema = expectedSchema
+        .add("C1", StringType, true, defaultMetadata())
+        .add("C2", StringType, true, defaultMetadata())
       assert(t.schema === expectedSchema)
       sql(s"ALTER TABLE $catalogName.alt_table ADD COLUMNS (C3 STRING)")
       t = spark.table(s"$catalogName.alt_table")
-      expectedSchema = expectedSchema.add("C3", StringType, true, 
defaultMetadata)
+      expectedSchema = expectedSchema
+        .add("C3", StringType, true, defaultMetadata())
       assert(t.schema === expectedSchema)
       // Add already existing column
       checkError(
@@ -125,7 +132,8 @@ private[v2] trait V2JDBCTest extends SharedSparkSession 
with DockerIntegrationFu
       sql(s"ALTER TABLE $catalogName.alt_table DROP COLUMN C1")
       sql(s"ALTER TABLE $catalogName.alt_table DROP COLUMN c3")
       val t = spark.table(s"$catalogName.alt_table")
-      val expectedSchema = new StructType().add("C2", StringType, true, 
defaultMetadata)
+      val expectedSchema = new StructType()
+        .add("C2", StringType, true, defaultMetadata())
       assert(t.schema === expectedSchema)
       // Drop not existing column
       val msg = intercept[AnalysisException] {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 6e7298710a5d..3521a50cd2dd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -269,6 +269,7 @@ object JdbcUtils extends Logging with SQLConfHelper {
     val fields = new Array[StructField](ncols)
     var i = 0
     while (i < ncols) {
+      val metadata = new MetadataBuilder()
       val columnName = rsmd.getColumnLabel(i + 1)
       val dataType = rsmd.getColumnType(i + 1)
       val typeName = rsmd.getColumnTypeName(i + 1)
@@ -289,8 +290,6 @@ object JdbcUtils extends Logging with SQLConfHelper {
       } else {
         rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls
       }
-      val metadata = new MetadataBuilder()
-      metadata.putLong("scale", fieldScale)
 
       dataType match {
         case java.sql.Types.TIME =>
@@ -302,7 +301,8 @@ object JdbcUtils extends Logging with SQLConfHelper {
           metadata.putBoolean("rowid", true)
         case _ =>
       }
-
+      metadata.putBoolean("isSigned", isSigned)
+      metadata.putLong("scale", fieldScale)
       val columnType =
         dialect.getCatalystType(dataType, typeName, fieldSize, 
metadata).getOrElse(
           getCatalystType(dataType, typeName, fieldSize, fieldScale, isSigned, 
isTimestampNTZ))
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
index c96da2d42a2a..bf27b748e24e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.connector.catalog.index.TableIndex
 import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, 
NamedReference, NullOrdering, SortDirection}
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
-import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, FloatType, 
LongType, MetadataBuilder, StringType}
+import org.apache.spark.sql.types._
 
 private case object MySQLDialect extends JdbcDialect with SQLConfHelper {
 
@@ -102,8 +102,12 @@ private case object MySQLDialect extends JdbcDialect with 
SQLConfHelper {
       // Some MySQL JDBC drivers converts JSON type into Types.VARCHAR with a 
precision of -1.
       // Explicitly converts it into StringType here.
       Some(StringType)
-    } else if (sqlType == Types.TINYINT && typeName.equals("TINYINT")) {
-      Some(ByteType)
+    } else if (sqlType == Types.TINYINT) {
+      if (md.build().getBoolean("isSigned")) {
+        Some(ByteType)
+      } else {
+        Some(ShortType)
+      }
     } else None
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
index eed64b873c45..078c708cc3fd 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
@@ -36,7 +36,11 @@ class JDBCTableCatalogSuite extends QueryTest with 
SharedSparkSession {
 
   val tempDir = Utils.createTempDir()
   val url = 
s"jdbc:h2:${tempDir.getCanonicalPath};user=testUser;password=testPass"
-  val defaultMetadata = new MetadataBuilder().putLong("scale", 0).build()
+
+  def defaultMetadata(dataType: DataType): Metadata = new MetadataBuilder()
+    .putLong("scale", 0)
+    .putBoolean("isSigned", dataType.isInstanceOf[NumericType])
+    .build()
 
   override def sparkConf: SparkConf = super.sparkConf
     .set("spark.sql.catalog.h2", classOf[JDBCTableCatalog].getName)
@@ -138,8 +142,8 @@ class JDBCTableCatalogSuite extends QueryTest with 
SharedSparkSession {
   test("load a table") {
     val t = spark.table("h2.test.people")
     val expectedSchema = new StructType()
-      .add("NAME", VarcharType(32), true, defaultMetadata)
-      .add("ID", IntegerType, true, defaultMetadata)
+      .add("NAME", VarcharType(32), true, defaultMetadata(VarcharType(32)))
+      .add("ID", IntegerType, true, defaultMetadata(IntegerType))
     assert(t.schema === 
CharVarcharUtils.replaceCharVarcharWithStringInSchema(expectedSchema))
     Seq(
       "h2.test.not_existing_table" -> "`h2`.`test`.`not_existing_table`",
@@ -181,13 +185,13 @@ class JDBCTableCatalogSuite extends QueryTest with 
SharedSparkSession {
       sql(s"ALTER TABLE $tableName ADD COLUMNS (C1 INTEGER, C2 STRING)")
       var t = spark.table(tableName)
       var expectedSchema = new StructType()
-        .add("ID", IntegerType, true, defaultMetadata)
-        .add("C1", IntegerType, true, defaultMetadata)
-        .add("C2", StringType, true, defaultMetadata)
+        .add("ID", IntegerType, true, defaultMetadata(IntegerType))
+        .add("C1", IntegerType, true, defaultMetadata(IntegerType))
+        .add("C2", StringType, true, defaultMetadata(StringType))
       assert(t.schema === expectedSchema)
       sql(s"ALTER TABLE $tableName ADD COLUMNS (c3 DOUBLE)")
       t = spark.table(tableName)
-      expectedSchema = expectedSchema.add("c3", DoubleType, true, 
defaultMetadata)
+      expectedSchema = expectedSchema.add("c3", DoubleType, true, 
defaultMetadata(DoubleType))
       assert(t.schema === expectedSchema)
       // Add already existing column
       checkError(
@@ -225,8 +229,8 @@ class JDBCTableCatalogSuite extends QueryTest with 
SharedSparkSession {
       sql(s"ALTER TABLE $tableName RENAME COLUMN id TO C")
       val t = spark.table(tableName)
       val expectedSchema = new StructType()
-        .add("C", IntegerType, true, defaultMetadata)
-        .add("C0", IntegerType, true, defaultMetadata)
+        .add("C", IntegerType, true, defaultMetadata(IntegerType))
+        .add("C0", IntegerType, true, defaultMetadata(IntegerType))
       assert(t.schema === expectedSchema)
       // Rename to already existing column
       checkError(
@@ -264,7 +268,8 @@ class JDBCTableCatalogSuite extends QueryTest with 
SharedSparkSession {
       sql(s"ALTER TABLE $tableName DROP COLUMN C1")
       sql(s"ALTER TABLE $tableName DROP COLUMN c3")
       val t = spark.table(tableName)
-      val expectedSchema = new StructType().add("C2", IntegerType, true, 
defaultMetadata)
+      val expectedSchema = new StructType()
+        .add("C2", IntegerType, true, defaultMetadata(IntegerType))
       assert(t.schema === expectedSchema)
       // Drop not existing column
       val msg = intercept[AnalysisException] {
@@ -293,8 +298,8 @@ class JDBCTableCatalogSuite extends QueryTest with 
SharedSparkSession {
       sql(s"ALTER TABLE $tableName ALTER COLUMN deptno TYPE DOUBLE")
       val t = spark.table(tableName)
       val expectedSchema = new StructType()
-        .add("ID", DoubleType, true, defaultMetadata)
-        .add("deptno", DoubleType, true, defaultMetadata)
+        .add("ID", DoubleType, true, defaultMetadata(DoubleType))
+        .add("deptno", DoubleType, true, defaultMetadata(DoubleType))
       assert(t.schema === expectedSchema)
       // Update not existing column
       val msg1 = intercept[AnalysisException] {
@@ -331,8 +336,8 @@ class JDBCTableCatalogSuite extends QueryTest with 
SharedSparkSession {
       sql(s"ALTER TABLE $tableName ALTER COLUMN deptno DROP NOT NULL")
       val t = spark.table(tableName)
       val expectedSchema = new StructType()
-        .add("ID", IntegerType, true, defaultMetadata)
-        .add("deptno", IntegerType, true, defaultMetadata)
+        .add("ID", IntegerType, true, defaultMetadata(IntegerType))
+        .add("deptno", IntegerType, true, defaultMetadata(IntegerType))
       assert(t.schema === expectedSchema)
       // Update nullability of not existing column
       val msg = intercept[AnalysisException] {
@@ -388,8 +393,8 @@ class JDBCTableCatalogSuite extends QueryTest with 
SharedSparkSession {
       sql(s"CREATE TABLE $tableName (c1 INTEGER NOT NULL, c2 INTEGER)")
       var t = spark.table(tableName)
       var expectedSchema = new StructType()
-        .add("c1", IntegerType, true, defaultMetadata)
-        .add("c2", IntegerType, true, defaultMetadata)
+        .add("c1", IntegerType, true, defaultMetadata(IntegerType))
+        .add("c2", IntegerType, true, defaultMetadata(IntegerType))
       assert(t.schema === expectedSchema)
 
       withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
@@ -402,8 +407,8 @@ class JDBCTableCatalogSuite extends QueryTest with 
SharedSparkSession {
       withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
         sql(s"ALTER TABLE $tableName RENAME COLUMN C2 TO c3")
         expectedSchema = new StructType()
-          .add("c1", IntegerType, true, defaultMetadata)
-          .add("c3", IntegerType, true, defaultMetadata)
+          .add("c1", IntegerType, true, defaultMetadata(IntegerType))
+          .add("c3", IntegerType, true, defaultMetadata(IntegerType))
         t = spark.table(tableName)
         assert(t.schema === expectedSchema)
       }
@@ -417,7 +422,8 @@ class JDBCTableCatalogSuite extends QueryTest with 
SharedSparkSession {
 
       withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
         sql(s"ALTER TABLE $tableName DROP COLUMN C3")
-        expectedSchema = new StructType().add("c1", IntegerType, true, 
defaultMetadata)
+        expectedSchema = new StructType()
+          .add("c1", IntegerType, true, defaultMetadata(IntegerType))
         t = spark.table(tableName)
         assert(t.schema === expectedSchema)
       }
@@ -431,7 +437,8 @@ class JDBCTableCatalogSuite extends QueryTest with 
SharedSparkSession {
 
       withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
         sql(s"ALTER TABLE $tableName ALTER COLUMN C1 TYPE DOUBLE")
-        expectedSchema = new StructType().add("c1", DoubleType, true, 
defaultMetadata)
+        expectedSchema = new StructType()
+          .add("c1", DoubleType, true, defaultMetadata(DoubleType))
         t = spark.table(tableName)
         assert(t.schema === expectedSchema)
       }
@@ -445,7 +452,8 @@ class JDBCTableCatalogSuite extends QueryTest with 
SharedSparkSession {
 
       withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
         sql(s"ALTER TABLE $tableName ALTER COLUMN C1 DROP NOT NULL")
-        expectedSchema = new StructType().add("c1", DoubleType, true, 
defaultMetadata)
+        expectedSchema = new StructType()
+          .add("c1", DoubleType, true, defaultMetadata(IntegerType))
         t = spark.table(tableName)
         assert(t.schema === expectedSchema)
       }
@@ -507,8 +515,8 @@ class JDBCTableCatalogSuite extends QueryTest with 
SharedSparkSession {
       sql(s"ALTER TABLE $tableName ALTER COLUMN deptno TYPE VARCHAR(30)")
       val t = spark.table(tableName)
       val expected = new StructType()
-        .add("ID", CharType(10), true, defaultMetadata)
-        .add("deptno", VarcharType(30), true, defaultMetadata)
+        .add("ID", CharType(10), true, defaultMetadata(CharType(10)))
+        .add("deptno", VarcharType(30), true, defaultMetadata(VarcharType(30)))
       val replaced = 
CharVarcharUtils.replaceCharVarcharWithStringInSchema(expected)
       assert(t.schema === replaced)
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index e151f2c0225d..f4702ee9edb3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -77,7 +77,10 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
     }
   }
 
-  val defaultMetadata = new MetadataBuilder().putLong("scale", 0).build()
+  def defaultMetadata(dataType: DataType): Metadata = new MetadataBuilder()
+    .putLong("scale", 0)
+    .putBoolean("isSigned", dataType.isInstanceOf[NumericType])
+    .build()
 
   override def beforeAll(): Unit = {
     super.beforeAll()
@@ -907,7 +910,7 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
 
   test("MySQLDialect catalyst type mapping") {
     val mySqlDialect = JdbcDialects.get("jdbc:mysql")
-    val metadata = new MetadataBuilder()
+    val metadata = new MetadataBuilder().putBoolean("isSigned", value = true)
     assert(mySqlDialect.getCatalystType(java.sql.Types.VARBINARY, "BIT", 2, 
metadata) ==
       Some(LongType))
     assert(metadata.build().contains("binarylong"))
@@ -916,6 +919,9 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
       Some(BooleanType))
     assert(mySqlDialect.getCatalystType(java.sql.Types.TINYINT, "TINYINT", 1, 
metadata) ==
       Some(ByteType))
+    metadata.putBoolean("isSigned", value = false)
+    assert(mySqlDialect.getCatalystType(java.sql.Types.TINYINT, "TINYINT", 1, 
metadata) ===
+      Some(ShortType))
   }
 
   test("SPARK-35446: MySQLDialect type mapping of float") {
@@ -1363,8 +1369,8 @@ class JDBCSuite extends QueryTest with SharedSparkSession 
{
   }
 
   test("SPARK-16848: jdbc API throws an exception for user specified schema") {
-    val schema = StructType(Seq(StructField("name", StringType, false, 
defaultMetadata),
-      StructField("theid", IntegerType, false, defaultMetadata)))
+    val schema = StructType(Seq(StructField("name", StringType, false, 
defaultMetadata(StringType)),
+      StructField("theid", IntegerType, false, defaultMetadata(IntegerType))))
     val parts = Array[String]("THEID < 2", "THEID >= 2")
     val e1 = intercept[AnalysisException] {
       spark.read.schema(schema).jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, 
new Properties())
@@ -1384,8 +1390,9 @@ class JDBCSuite extends QueryTest with SharedSparkSession 
{
     props.put("customSchema", customSchema)
     val df = spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, props)
     assert(df.schema.size === 2)
-    val expectedSchema = new 
StructType(CatalystSqlParser.parseTableSchema(customSchema).map(
-      f => StructField(f.name, f.dataType, f.nullable, 
defaultMetadata)).toArray)
+    val structType = CatalystSqlParser.parseTableSchema(customSchema)
+    val expectedSchema = new StructType(structType.map(
+      f => StructField(f.name, f.dataType, f.nullable, 
defaultMetadata(f.dataType))).toArray)
     assert(df.schema === 
CharVarcharUtils.replaceCharVarcharWithStringInSchema(expectedSchema))
     assert(df.count() === 3)
   }
@@ -1403,7 +1410,7 @@ class JDBCSuite extends QueryTest with SharedSparkSession 
{
       val df = sql("select * from people_view")
       assert(df.schema.length === 2)
       val expectedSchema = new 
StructType(CatalystSqlParser.parseTableSchema(customSchema)
-        .map(f => StructField(f.name, f.dataType, f.nullable, 
defaultMetadata)).toArray)
+        .map(f => StructField(f.name, f.dataType, f.nullable, 
defaultMetadata(f.dataType))).toArray)
 
       assert(df.schema === 
CharVarcharUtils.replaceCharVarcharWithStringInSchema(expectedSchema))
       assert(df.count() === 3)
@@ -1550,8 +1557,9 @@ class JDBCSuite extends QueryTest with SharedSparkSession 
{
     }
 
   test("jdbc data source shouldn't have unnecessary metadata in its schema") {
-    var schema = StructType(Seq(StructField("NAME", VarcharType(32), true, 
defaultMetadata),
-      StructField("THEID", IntegerType, true, defaultMetadata)))
+    var schema = StructType(
+      Seq(StructField("NAME", VarcharType(32), true, 
defaultMetadata(VarcharType(32))),
+      StructField("THEID", IntegerType, true, defaultMetadata(IntegerType))))
     schema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(schema)
     val df = spark.read.format("jdbc")
       .option("Url", urlWithUserAndPass)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to