This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 3857c16be81f [SPARK-47435][SPARK-45561][SQL][3.5] Fix overflow issue of MySQL UNSIGNED TINYINT caused by 3857c16be81f is described below commit 3857c16be81f568cc5caf81f65941109bf5f2939 Author: Kent Yao <y...@apache.org> AuthorDate: Tue Mar 19 13:37:28 2024 +0800 [SPARK-47435][SPARK-45561][SQL][3.5] Fix overflow issue of MySQL UNSIGNED TINYINT caused by ### What changes were proposed in this pull request? SPARK-45561 mapped java.sql.Types.TINYINT to ByteType in MySQL Dialect, which caused unsigned TINYINT overflow. As regardless of signed or unsigned types, the TINYINT is used for java.sql.Types. In this PR, we put the signed info into the metadata for mapping TINYINT to short or byte. ### Why are the changes needed? bugfix ### Does this PR introduce _any_ user-facing change? Uses can read MySQL UNSIGNED TINYINT values after this PR like versions before 3.5.0 which has breaked since 3.5.1 ### How was this patch tested? new tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #45579 from yaooqinn/SPARK-47435-B. Authored-by: Kent Yao <y...@apache.org> Signed-off-by: Kent Yao <y...@apache.org> --- .../spark/sql/jdbc/MySQLIntegrationSuite.scala | 9 ++-- .../spark/sql/jdbc/v2/DB2IntegrationSuite.scala | 9 ++-- .../sql/jdbc/v2/MsSqlServerIntegrationSuite.scala | 6 ++- .../spark/sql/jdbc/v2/MySQLIntegrationSuite.scala | 14 ++++-- .../spark/sql/jdbc/v2/OracleIntegrationSuite.scala | 9 ++-- .../sql/jdbc/v2/PostgresIntegrationSuite.scala | 9 ++-- .../org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala | 28 +++++++---- .../sql/execution/datasources/jdbc/JdbcUtils.scala | 6 +-- .../org/apache/spark/sql/jdbc/MySQLDialect.scala | 10 ++-- .../v2/jdbc/JDBCTableCatalogSuite.scala | 54 +++++++++++++--------- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 26 +++++++---- 11 files changed, 115 insertions(+), 65 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala index 20fdc965874f..dcf4225d522d 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala @@ -56,10 +56,11 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { conn.prepareStatement("CREATE TABLE numbers (onebit BIT(1), tenbits BIT(10), " + "small SMALLINT, med MEDIUMINT, nor INT, big BIGINT, deci DECIMAL(40,20), flt FLOAT, " - + "dbl DOUBLE, tiny TINYINT)").executeUpdate() + + "dbl DOUBLE, tiny TINYINT, u_tiny TINYINT UNSIGNED)").executeUpdate() + conn.prepareStatement("INSERT INTO numbers VALUES (b'0', b'1000100101', " + "17, 77777, 123456789, 123456789012345, 123456789012345.123456789012345, " - + "42.75, 1.0000000000000002, -128)").executeUpdate() + + "42.75, 1.0000000000000002, -128, 255)").executeUpdate() conn.prepareStatement("CREATE TABLE dates (d DATE, t TIME, dt DATETIME, ts TIMESTAMP, " + "yr YEAR)").executeUpdate() @@ -89,7 +90,7 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { val rows = df.collect() assert(rows.length == 1) val types = rows(0).toSeq.map(x => x.getClass.toString) - assert(types.length == 10) + assert(types.length == 11) assert(types(0).equals("class java.lang.Boolean")) assert(types(1).equals("class java.lang.Long")) assert(types(2).equals("class java.lang.Integer")) @@ -100,6 +101,7 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { assert(types(7).equals("class java.lang.Double")) assert(types(8).equals("class java.lang.Double")) assert(types(9).equals("class java.lang.Byte")) + assert(types(10).equals("class java.lang.Short")) assert(rows(0).getBoolean(0) == false) assert(rows(0).getLong(1) == 0x225) assert(rows(0).getInt(2) == 17) @@ -111,6 +113,7 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { assert(rows(0).getDouble(7) == 42.75) assert(rows(0).getDouble(8) == 1.0000000000000002) assert(rows(0).getByte(9) == 0x80.toByte) + assert(rows(0).getShort(10) == 0xff.toShort) } test("Date types") { diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala index 661b1277e9f0..9a78244f5326 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala @@ -85,11 +85,13 @@ class DB2IntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest { override def testUpdateColumnType(tbl: String): Unit = { sql(s"CREATE TABLE $tbl (ID INTEGER)") var t = spark.table(tbl) - var expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata) + var expectedSchema = new StructType() + .add("ID", IntegerType, true, defaultMetadata(IntegerType)) assert(t.schema === expectedSchema) sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE DOUBLE") t = spark.table(tbl) - expectedSchema = new StructType().add("ID", DoubleType, true, defaultMetadata) + expectedSchema = new StructType() + .add("ID", DoubleType, true, defaultMetadata(DoubleType)) assert(t.schema === expectedSchema) // Update column type from DOUBLE to STRING val sql1 = s"ALTER TABLE $tbl ALTER COLUMN id TYPE VARCHAR(10)" @@ -112,7 +114,8 @@ class DB2IntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest { sql(s"CREATE TABLE $tbl (ID INT)" + s" TBLPROPERTIES('CCSID'='UNICODE')") val t = spark.table(tbl) - val expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata) + val expectedSchema = new StructType() + .add("ID", IntegerType, true, defaultMetadata(IntegerType)) assert(t.schema === expectedSchema) } diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala index fc93f5cba4c0..e451cc2b8c52 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala @@ -93,11 +93,13 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JD override def testUpdateColumnType(tbl: String): Unit = { sql(s"CREATE TABLE $tbl (ID INTEGER)") var t = spark.table(tbl) - var expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata) + var expectedSchema = new StructType() + .add("ID", IntegerType, true, defaultMetadata(IntegerType)) assert(t.schema === expectedSchema) sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE STRING") t = spark.table(tbl) - expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata) + expectedSchema = new StructType() + .add("ID", StringType, true, defaultMetadata()) assert(t.schema === expectedSchema) // Update column type from STRING to INTEGER val sql1 = s"ALTER TABLE $tbl ALTER COLUMN id TYPE INTEGER" diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala index 5e340f135c85..719b858b87b6 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala @@ -83,6 +83,11 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest private var mySQLVersion = -1 + override def defaultMetadata(dataType: DataType = StringType): Metadata = new MetadataBuilder() + .putLong("scale", 0) + .putBoolean("isSigned", true) + .build() + override def tablePreparation(connection: Connection): Unit = { mySQLVersion = connection.getMetaData.getDatabaseMajorVersion connection.prepareStatement( @@ -93,11 +98,13 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest override def testUpdateColumnType(tbl: String): Unit = { sql(s"CREATE TABLE $tbl (ID INTEGER)") var t = spark.table(tbl) - var expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata) + var expectedSchema = new StructType() + .add("ID", IntegerType, true, defaultMetadata(IntegerType)) assert(t.schema === expectedSchema) sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE STRING") t = spark.table(tbl) - expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata) + expectedSchema = new StructType() + .add("ID", StringType, true, defaultMetadata()) assert(t.schema === expectedSchema) // Update column type from STRING to INTEGER val sql1 = s"ALTER TABLE $tbl ALTER COLUMN id TYPE INTEGER" @@ -145,7 +152,8 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest sql(s"CREATE TABLE $tbl (ID INT)" + s" TBLPROPERTIES('ENGINE'='InnoDB', 'DEFAULT CHARACTER SET'='utf8')") val t = spark.table(tbl) - val expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata) + val expectedSchema = new StructType() + .add("ID", IntegerType, true, defaultMetadata(IntegerType)) assert(t.schema === expectedSchema) } diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala index 6b5dd043a617..33ce55b1c6c9 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala @@ -87,8 +87,9 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTes s"jdbc:oracle:thin:system/$oracle_password@//$ip:$port/xe" } - override val defaultMetadata: Metadata = new MetadataBuilder() + override def defaultMetadata(dataType: DataType): Metadata = new MetadataBuilder() .putLong("scale", 0) + .putBoolean("isSigned", dataType.isInstanceOf[NumericType] || dataType.isInstanceOf[StringType]) .putString(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY, "varchar(255)") .build() @@ -110,11 +111,13 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTes override def testUpdateColumnType(tbl: String): Unit = { sql(s"CREATE TABLE $tbl (ID INTEGER)") var t = spark.table(tbl) - var expectedSchema = new StructType().add("ID", DecimalType(10, 0), true, super.defaultMetadata) + var expectedSchema = new StructType() + .add("ID", DecimalType(10, 0), true, super.defaultMetadata(DecimalType(10, 0))) assert(t.schema === expectedSchema) sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE LONG") t = spark.table(tbl) - expectedSchema = new StructType().add("ID", DecimalType(19, 0), true, super.defaultMetadata) + expectedSchema = new StructType() + .add("ID", DecimalType(19, 0), true, super.defaultMetadata(DecimalType(19, 0))) assert(t.schema === expectedSchema) // Update column type from LONG to INTEGER val sql1 = s"ALTER TABLE $tbl ALTER COLUMN id TYPE INTEGER" diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala index 85e85f8bf380..d2a18ff96b44 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala @@ -64,11 +64,13 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT override def testUpdateColumnType(tbl: String): Unit = { sql(s"CREATE TABLE $tbl (ID INTEGER)") var t = spark.table(tbl) - var expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata) + var expectedSchema = new StructType() + .add("ID", IntegerType, true, defaultMetadata(IntegerType)) assert(t.schema === expectedSchema) sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE STRING") t = spark.table(tbl) - expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata) + expectedSchema = new StructType() + .add("ID", StringType, true, defaultMetadata()) assert(t.schema === expectedSchema) // Update column type from STRING to INTEGER val sql1 = s"ALTER TABLE $tbl ALTER COLUMN id TYPE INTEGER" @@ -91,7 +93,8 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT sql(s"CREATE TABLE $tbl (ID INT)" + s" TBLPROPERTIES('TABLESPACE'='pg_default')") val t = spark.table(tbl) - val expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata) + val expectedSchema = new StructType() + .add("ID", IntegerType, true, defaultMetadata(IntegerType)) assert(t.schema === expectedSchema) } diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala index 99f435611f2c..b8671455ac6f 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala @@ -49,18 +49,21 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu def notSupportsTableComment: Boolean = false - def defaultMetadata: Metadata = new MetadataBuilder().putLong("scale", 0).build() + def defaultMetadata(dataType: DataType = StringType): Metadata = new MetadataBuilder() + .putLong("scale", 0) + .putBoolean("isSigned", dataType.isInstanceOf[NumericType]) + .build() def testUpdateColumnNullability(tbl: String): Unit = { sql(s"CREATE TABLE $catalogName.alt_table (ID STRING NOT NULL)") var t = spark.table(s"$catalogName.alt_table") // nullable is true in the expectedSchema because Spark always sets nullable to true // regardless of the JDBC metadata https://github.com/apache/spark/pull/18445 - var expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata) + var expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata()) assert(t.schema === expectedSchema) sql(s"ALTER TABLE $catalogName.alt_table ALTER COLUMN ID DROP NOT NULL") t = spark.table(s"$catalogName.alt_table") - expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata) + expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata()) assert(t.schema === expectedSchema) // Update nullability of not existing column val msg = intercept[AnalysisException] { @@ -72,8 +75,9 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu def testRenameColumn(tbl: String): Unit = { sql(s"ALTER TABLE $tbl RENAME COLUMN ID TO RENAMED") val t = spark.table(s"$tbl") - val expectedSchema = new StructType().add("RENAMED", StringType, true, defaultMetadata) - .add("ID1", StringType, true, defaultMetadata).add("ID2", StringType, true, defaultMetadata) + val expectedSchema = new StructType().add("RENAMED", StringType, true, defaultMetadata()) + .add("ID1", StringType, true, defaultMetadata()) + .add("ID2", StringType, true, defaultMetadata()) assert(t.schema === expectedSchema) } @@ -83,16 +87,19 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu withTable(s"$catalogName.alt_table") { sql(s"CREATE TABLE $catalogName.alt_table (ID STRING)") var t = spark.table(s"$catalogName.alt_table") - var expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata) + var expectedSchema = new StructType() + .add("ID", StringType, true, defaultMetadata()) assert(t.schema === expectedSchema) sql(s"ALTER TABLE $catalogName.alt_table ADD COLUMNS (C1 STRING, C2 STRING)") t = spark.table(s"$catalogName.alt_table") - expectedSchema = expectedSchema.add("C1", StringType, true, defaultMetadata) - .add("C2", StringType, true, defaultMetadata) + expectedSchema = expectedSchema + .add("C1", StringType, true, defaultMetadata()) + .add("C2", StringType, true, defaultMetadata()) assert(t.schema === expectedSchema) sql(s"ALTER TABLE $catalogName.alt_table ADD COLUMNS (C3 STRING)") t = spark.table(s"$catalogName.alt_table") - expectedSchema = expectedSchema.add("C3", StringType, true, defaultMetadata) + expectedSchema = expectedSchema + .add("C3", StringType, true, defaultMetadata()) assert(t.schema === expectedSchema) // Add already existing column checkError( @@ -125,7 +132,8 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu sql(s"ALTER TABLE $catalogName.alt_table DROP COLUMN C1") sql(s"ALTER TABLE $catalogName.alt_table DROP COLUMN c3") val t = spark.table(s"$catalogName.alt_table") - val expectedSchema = new StructType().add("C2", StringType, true, defaultMetadata) + val expectedSchema = new StructType() + .add("C2", StringType, true, defaultMetadata()) assert(t.schema === expectedSchema) // Drop not existing column val msg = intercept[AnalysisException] { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 6e7298710a5d..3521a50cd2dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -269,6 +269,7 @@ object JdbcUtils extends Logging with SQLConfHelper { val fields = new Array[StructField](ncols) var i = 0 while (i < ncols) { + val metadata = new MetadataBuilder() val columnName = rsmd.getColumnLabel(i + 1) val dataType = rsmd.getColumnType(i + 1) val typeName = rsmd.getColumnTypeName(i + 1) @@ -289,8 +290,6 @@ object JdbcUtils extends Logging with SQLConfHelper { } else { rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls } - val metadata = new MetadataBuilder() - metadata.putLong("scale", fieldScale) dataType match { case java.sql.Types.TIME => @@ -302,7 +301,8 @@ object JdbcUtils extends Logging with SQLConfHelper { metadata.putBoolean("rowid", true) case _ => } - + metadata.putBoolean("isSigned", isSigned) + metadata.putLong("scale", fieldScale) val columnType = dialect.getCatalystType(dataType, typeName, fieldSize, metadata).getOrElse( getCatalystType(dataType, typeName, fieldSize, fieldScale, isSigned, isTimestampNTZ)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index c96da2d42a2a..bf27b748e24e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.connector.catalog.index.TableIndex import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, NamedReference, NullOrdering, SortDirection} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} -import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, FloatType, LongType, MetadataBuilder, StringType} +import org.apache.spark.sql.types._ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { @@ -102,8 +102,12 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { // Some MySQL JDBC drivers converts JSON type into Types.VARCHAR with a precision of -1. // Explicitly converts it into StringType here. Some(StringType) - } else if (sqlType == Types.TINYINT && typeName.equals("TINYINT")) { - Some(ByteType) + } else if (sqlType == Types.TINYINT) { + if (md.build().getBoolean("isSigned")) { + Some(ByteType) + } else { + Some(ShortType) + } } else None } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala index eed64b873c45..078c708cc3fd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala @@ -36,7 +36,11 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { val tempDir = Utils.createTempDir() val url = s"jdbc:h2:${tempDir.getCanonicalPath};user=testUser;password=testPass" - val defaultMetadata = new MetadataBuilder().putLong("scale", 0).build() + + def defaultMetadata(dataType: DataType): Metadata = new MetadataBuilder() + .putLong("scale", 0) + .putBoolean("isSigned", dataType.isInstanceOf[NumericType]) + .build() override def sparkConf: SparkConf = super.sparkConf .set("spark.sql.catalog.h2", classOf[JDBCTableCatalog].getName) @@ -138,8 +142,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { test("load a table") { val t = spark.table("h2.test.people") val expectedSchema = new StructType() - .add("NAME", VarcharType(32), true, defaultMetadata) - .add("ID", IntegerType, true, defaultMetadata) + .add("NAME", VarcharType(32), true, defaultMetadata(VarcharType(32))) + .add("ID", IntegerType, true, defaultMetadata(IntegerType)) assert(t.schema === CharVarcharUtils.replaceCharVarcharWithStringInSchema(expectedSchema)) Seq( "h2.test.not_existing_table" -> "`h2`.`test`.`not_existing_table`", @@ -181,13 +185,13 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { sql(s"ALTER TABLE $tableName ADD COLUMNS (C1 INTEGER, C2 STRING)") var t = spark.table(tableName) var expectedSchema = new StructType() - .add("ID", IntegerType, true, defaultMetadata) - .add("C1", IntegerType, true, defaultMetadata) - .add("C2", StringType, true, defaultMetadata) + .add("ID", IntegerType, true, defaultMetadata(IntegerType)) + .add("C1", IntegerType, true, defaultMetadata(IntegerType)) + .add("C2", StringType, true, defaultMetadata(StringType)) assert(t.schema === expectedSchema) sql(s"ALTER TABLE $tableName ADD COLUMNS (c3 DOUBLE)") t = spark.table(tableName) - expectedSchema = expectedSchema.add("c3", DoubleType, true, defaultMetadata) + expectedSchema = expectedSchema.add("c3", DoubleType, true, defaultMetadata(DoubleType)) assert(t.schema === expectedSchema) // Add already existing column checkError( @@ -225,8 +229,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { sql(s"ALTER TABLE $tableName RENAME COLUMN id TO C") val t = spark.table(tableName) val expectedSchema = new StructType() - .add("C", IntegerType, true, defaultMetadata) - .add("C0", IntegerType, true, defaultMetadata) + .add("C", IntegerType, true, defaultMetadata(IntegerType)) + .add("C0", IntegerType, true, defaultMetadata(IntegerType)) assert(t.schema === expectedSchema) // Rename to already existing column checkError( @@ -264,7 +268,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { sql(s"ALTER TABLE $tableName DROP COLUMN C1") sql(s"ALTER TABLE $tableName DROP COLUMN c3") val t = spark.table(tableName) - val expectedSchema = new StructType().add("C2", IntegerType, true, defaultMetadata) + val expectedSchema = new StructType() + .add("C2", IntegerType, true, defaultMetadata(IntegerType)) assert(t.schema === expectedSchema) // Drop not existing column val msg = intercept[AnalysisException] { @@ -293,8 +298,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { sql(s"ALTER TABLE $tableName ALTER COLUMN deptno TYPE DOUBLE") val t = spark.table(tableName) val expectedSchema = new StructType() - .add("ID", DoubleType, true, defaultMetadata) - .add("deptno", DoubleType, true, defaultMetadata) + .add("ID", DoubleType, true, defaultMetadata(DoubleType)) + .add("deptno", DoubleType, true, defaultMetadata(DoubleType)) assert(t.schema === expectedSchema) // Update not existing column val msg1 = intercept[AnalysisException] { @@ -331,8 +336,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { sql(s"ALTER TABLE $tableName ALTER COLUMN deptno DROP NOT NULL") val t = spark.table(tableName) val expectedSchema = new StructType() - .add("ID", IntegerType, true, defaultMetadata) - .add("deptno", IntegerType, true, defaultMetadata) + .add("ID", IntegerType, true, defaultMetadata(IntegerType)) + .add("deptno", IntegerType, true, defaultMetadata(IntegerType)) assert(t.schema === expectedSchema) // Update nullability of not existing column val msg = intercept[AnalysisException] { @@ -388,8 +393,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { sql(s"CREATE TABLE $tableName (c1 INTEGER NOT NULL, c2 INTEGER)") var t = spark.table(tableName) var expectedSchema = new StructType() - .add("c1", IntegerType, true, defaultMetadata) - .add("c2", IntegerType, true, defaultMetadata) + .add("c1", IntegerType, true, defaultMetadata(IntegerType)) + .add("c2", IntegerType, true, defaultMetadata(IntegerType)) assert(t.schema === expectedSchema) withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { @@ -402,8 +407,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { sql(s"ALTER TABLE $tableName RENAME COLUMN C2 TO c3") expectedSchema = new StructType() - .add("c1", IntegerType, true, defaultMetadata) - .add("c3", IntegerType, true, defaultMetadata) + .add("c1", IntegerType, true, defaultMetadata(IntegerType)) + .add("c3", IntegerType, true, defaultMetadata(IntegerType)) t = spark.table(tableName) assert(t.schema === expectedSchema) } @@ -417,7 +422,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { sql(s"ALTER TABLE $tableName DROP COLUMN C3") - expectedSchema = new StructType().add("c1", IntegerType, true, defaultMetadata) + expectedSchema = new StructType() + .add("c1", IntegerType, true, defaultMetadata(IntegerType)) t = spark.table(tableName) assert(t.schema === expectedSchema) } @@ -431,7 +437,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { sql(s"ALTER TABLE $tableName ALTER COLUMN C1 TYPE DOUBLE") - expectedSchema = new StructType().add("c1", DoubleType, true, defaultMetadata) + expectedSchema = new StructType() + .add("c1", DoubleType, true, defaultMetadata(DoubleType)) t = spark.table(tableName) assert(t.schema === expectedSchema) } @@ -445,7 +452,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { sql(s"ALTER TABLE $tableName ALTER COLUMN C1 DROP NOT NULL") - expectedSchema = new StructType().add("c1", DoubleType, true, defaultMetadata) + expectedSchema = new StructType() + .add("c1", DoubleType, true, defaultMetadata(IntegerType)) t = spark.table(tableName) assert(t.schema === expectedSchema) } @@ -507,8 +515,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { sql(s"ALTER TABLE $tableName ALTER COLUMN deptno TYPE VARCHAR(30)") val t = spark.table(tableName) val expected = new StructType() - .add("ID", CharType(10), true, defaultMetadata) - .add("deptno", VarcharType(30), true, defaultMetadata) + .add("ID", CharType(10), true, defaultMetadata(CharType(10))) + .add("deptno", VarcharType(30), true, defaultMetadata(VarcharType(30))) val replaced = CharVarcharUtils.replaceCharVarcharWithStringInSchema(expected) assert(t.schema === replaced) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index e151f2c0225d..f4702ee9edb3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -77,7 +77,10 @@ class JDBCSuite extends QueryTest with SharedSparkSession { } } - val defaultMetadata = new MetadataBuilder().putLong("scale", 0).build() + def defaultMetadata(dataType: DataType): Metadata = new MetadataBuilder() + .putLong("scale", 0) + .putBoolean("isSigned", dataType.isInstanceOf[NumericType]) + .build() override def beforeAll(): Unit = { super.beforeAll() @@ -907,7 +910,7 @@ class JDBCSuite extends QueryTest with SharedSparkSession { test("MySQLDialect catalyst type mapping") { val mySqlDialect = JdbcDialects.get("jdbc:mysql") - val metadata = new MetadataBuilder() + val metadata = new MetadataBuilder().putBoolean("isSigned", value = true) assert(mySqlDialect.getCatalystType(java.sql.Types.VARBINARY, "BIT", 2, metadata) == Some(LongType)) assert(metadata.build().contains("binarylong")) @@ -916,6 +919,9 @@ class JDBCSuite extends QueryTest with SharedSparkSession { Some(BooleanType)) assert(mySqlDialect.getCatalystType(java.sql.Types.TINYINT, "TINYINT", 1, metadata) == Some(ByteType)) + metadata.putBoolean("isSigned", value = false) + assert(mySqlDialect.getCatalystType(java.sql.Types.TINYINT, "TINYINT", 1, metadata) === + Some(ShortType)) } test("SPARK-35446: MySQLDialect type mapping of float") { @@ -1363,8 +1369,8 @@ class JDBCSuite extends QueryTest with SharedSparkSession { } test("SPARK-16848: jdbc API throws an exception for user specified schema") { - val schema = StructType(Seq(StructField("name", StringType, false, defaultMetadata), - StructField("theid", IntegerType, false, defaultMetadata))) + val schema = StructType(Seq(StructField("name", StringType, false, defaultMetadata(StringType)), + StructField("theid", IntegerType, false, defaultMetadata(IntegerType)))) val parts = Array[String]("THEID < 2", "THEID >= 2") val e1 = intercept[AnalysisException] { spark.read.schema(schema).jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, new Properties()) @@ -1384,8 +1390,9 @@ class JDBCSuite extends QueryTest with SharedSparkSession { props.put("customSchema", customSchema) val df = spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, props) assert(df.schema.size === 2) - val expectedSchema = new StructType(CatalystSqlParser.parseTableSchema(customSchema).map( - f => StructField(f.name, f.dataType, f.nullable, defaultMetadata)).toArray) + val structType = CatalystSqlParser.parseTableSchema(customSchema) + val expectedSchema = new StructType(structType.map( + f => StructField(f.name, f.dataType, f.nullable, defaultMetadata(f.dataType))).toArray) assert(df.schema === CharVarcharUtils.replaceCharVarcharWithStringInSchema(expectedSchema)) assert(df.count() === 3) } @@ -1403,7 +1410,7 @@ class JDBCSuite extends QueryTest with SharedSparkSession { val df = sql("select * from people_view") assert(df.schema.length === 2) val expectedSchema = new StructType(CatalystSqlParser.parseTableSchema(customSchema) - .map(f => StructField(f.name, f.dataType, f.nullable, defaultMetadata)).toArray) + .map(f => StructField(f.name, f.dataType, f.nullable, defaultMetadata(f.dataType))).toArray) assert(df.schema === CharVarcharUtils.replaceCharVarcharWithStringInSchema(expectedSchema)) assert(df.count() === 3) @@ -1550,8 +1557,9 @@ class JDBCSuite extends QueryTest with SharedSparkSession { } test("jdbc data source shouldn't have unnecessary metadata in its schema") { - var schema = StructType(Seq(StructField("NAME", VarcharType(32), true, defaultMetadata), - StructField("THEID", IntegerType, true, defaultMetadata))) + var schema = StructType( + Seq(StructField("NAME", VarcharType(32), true, defaultMetadata(VarcharType(32))), + StructField("THEID", IntegerType, true, defaultMetadata(IntegerType)))) schema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(schema) val df = spark.read.format("jdbc") .option("Url", urlWithUserAndPass) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org