This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 8bd42cbdb6bf [SPARK-47435][SQL] Fix overflow issue of MySQL UNSIGNED TINYINT caused by SPARK-45561 8bd42cbdb6bf is described below commit 8bd42cbdb6bfa40aead94570b06e926f8e8aa9e1 Author: Kent Yao <y...@apache.org> AuthorDate: Mon Mar 18 08:56:55 2024 -0700 [SPARK-47435][SQL] Fix overflow issue of MySQL UNSIGNED TINYINT caused by SPARK-45561 ### What changes were proposed in this pull request? SPARK-45561 mapped java.sql.Types.TINYINT to ByteType in MySQL Dialect, which caused unsigned TINYINT overflow. As regardless of signed or unsigned types, the TINYINT is used for java.sql.Types. In this PR, we put the signed info into the metadata for mapping TINYINT to short or byte. ### Why are the changes needed? bugfix ### Does this PR introduce _any_ user-facing change? Uses can read MySQL UNSIGNED TINYINT values after this PR like versions before 3.5.0 which has breaked since 3.5.1 ### How was this patch tested? new tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #45556 from yaooqinn/SPARK-47435. Authored-by: Kent Yao <y...@apache.org> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../spark/sql/jdbc/MySQLIntegrationSuite.scala | 9 ++-- .../spark/sql/jdbc/v2/DB2IntegrationSuite.scala | 9 ++-- .../sql/jdbc/v2/MsSqlServerIntegrationSuite.scala | 6 ++- .../spark/sql/jdbc/v2/MySQLIntegrationSuite.scala | 15 ++++-- .../spark/sql/jdbc/v2/OracleIntegrationSuite.scala | 9 ++-- .../sql/jdbc/v2/PostgresIntegrationSuite.scala | 9 ++-- .../org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala | 26 ++++++---- .../sql/execution/datasources/jdbc/JdbcUtils.scala | 5 +- .../org/apache/spark/sql/jdbc/MySQLDialect.scala | 10 ++-- .../v2/jdbc/JDBCTableCatalogSuite.scala | 60 ++++++++++++---------- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 24 +++++---- 11 files changed, 114 insertions(+), 68 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala index b1d239337aa0..79e88f109534 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala @@ -57,10 +57,11 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { conn.prepareStatement("CREATE TABLE numbers (onebit BIT(1), tenbits BIT(10), " + "small SMALLINT, med MEDIUMINT, nor INT, big BIGINT, deci DECIMAL(40,20), flt FLOAT, " - + "dbl DOUBLE, tiny TINYINT)").executeUpdate() + + "dbl DOUBLE, tiny TINYINT, u_tiny TINYINT UNSIGNED)").executeUpdate() + conn.prepareStatement("INSERT INTO numbers VALUES (b'0', b'1000100101', " + "17, 77777, 123456789, 123456789012345, 123456789012345.123456789012345, " - + "42.75, 1.0000000000000002, -128)").executeUpdate() + + "42.75, 1.0000000000000002, -128, 255)").executeUpdate() conn.prepareStatement("CREATE TABLE dates (d DATE, t TIME, dt DATETIME, ts TIMESTAMP, " + "yr YEAR)").executeUpdate() @@ -90,7 +91,7 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { val rows = df.collect() assert(rows.length == 1) val types = rows(0).toSeq.map(x => x.getClass.toString) - assert(types.length == 10) + assert(types.length == 11) assert(types(0).equals("class java.lang.Boolean")) assert(types(1).equals("class java.lang.Long")) assert(types(2).equals("class java.lang.Integer")) @@ -101,6 +102,7 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { assert(types(7).equals("class java.lang.Double")) assert(types(8).equals("class java.lang.Double")) assert(types(9).equals("class java.lang.Byte")) + assert(types(10).equals("class java.lang.Short")) assert(rows(0).getBoolean(0) == false) assert(rows(0).getLong(1) == 0x225) assert(rows(0).getInt(2) == 17) @@ -112,6 +114,7 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { assert(rows(0).getDouble(7) == 42.75) assert(rows(0).getDouble(8) == 1.0000000000000002) assert(rows(0).getByte(9) == 0x80.toByte) + assert(rows(0).getShort(10) == 0xff.toShort) } test("Date types") { diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala index c3ec7e1925fa..6c1b7fdd1be5 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala @@ -70,11 +70,13 @@ class DB2IntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest { override def testUpdateColumnType(tbl: String): Unit = { sql(s"CREATE TABLE $tbl (ID INTEGER)") var t = spark.table(tbl) - var expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata) + var expectedSchema = new StructType() + .add("ID", IntegerType, true, defaultMetadata(IntegerType)) assert(t.schema === expectedSchema) sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE DOUBLE") t = spark.table(tbl) - expectedSchema = new StructType().add("ID", DoubleType, true, defaultMetadata) + expectedSchema = new StructType() + .add("ID", DoubleType, true, defaultMetadata(DoubleType)) assert(t.schema === expectedSchema) // Update column type from DOUBLE to STRING val sql1 = s"ALTER TABLE $tbl ALTER COLUMN id TYPE VARCHAR(10)" @@ -97,7 +99,8 @@ class DB2IntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest { sql(s"CREATE TABLE $tbl (ID INT)" + s" TBLPROPERTIES('CCSID'='UNICODE')") val t = spark.table(tbl) - val expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata) + val expectedSchema = new StructType() + .add("ID", IntegerType, true, defaultMetadata(IntegerType)) assert(t.schema === expectedSchema) } diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala index fc93f5cba4c0..e451cc2b8c52 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala @@ -93,11 +93,13 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JD override def testUpdateColumnType(tbl: String): Unit = { sql(s"CREATE TABLE $tbl (ID INTEGER)") var t = spark.table(tbl) - var expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata) + var expectedSchema = new StructType() + .add("ID", IntegerType, true, defaultMetadata(IntegerType)) assert(t.schema === expectedSchema) sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE STRING") t = spark.table(tbl) - expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata) + expectedSchema = new StructType() + .add("ID", StringType, true, defaultMetadata()) assert(t.schema === expectedSchema) // Update column type from STRING to INTEGER val sql1 = s"ALTER TABLE $tbl ALTER COLUMN id TYPE INTEGER" diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala index 5e340f135c85..2b189db2c1cb 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala @@ -83,6 +83,12 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest private var mySQLVersion = -1 + override def defaultMetadata(dataType: DataType = StringType): Metadata = new MetadataBuilder() + .putLong("scale", 0) + .putBoolean("isTimestampNTZ", false) + .putBoolean("isSigned", true) + .build() + override def tablePreparation(connection: Connection): Unit = { mySQLVersion = connection.getMetaData.getDatabaseMajorVersion connection.prepareStatement( @@ -93,11 +99,13 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest override def testUpdateColumnType(tbl: String): Unit = { sql(s"CREATE TABLE $tbl (ID INTEGER)") var t = spark.table(tbl) - var expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata) + var expectedSchema = new StructType() + .add("ID", IntegerType, true, defaultMetadata(IntegerType)) assert(t.schema === expectedSchema) sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE STRING") t = spark.table(tbl) - expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata) + expectedSchema = new StructType() + .add("ID", StringType, true, defaultMetadata()) assert(t.schema === expectedSchema) // Update column type from STRING to INTEGER val sql1 = s"ALTER TABLE $tbl ALTER COLUMN id TYPE INTEGER" @@ -145,7 +153,8 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest sql(s"CREATE TABLE $tbl (ID INT)" + s" TBLPROPERTIES('ENGINE'='InnoDB', 'DEFAULT CHARACTER SET'='utf8')") val t = spark.table(tbl) - val expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata) + val expectedSchema = new StructType() + .add("ID", IntegerType, true, defaultMetadata(IntegerType)) assert(t.schema === expectedSchema) } diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala index 591147413486..0aa2905f93b8 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala @@ -77,9 +77,10 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTes override val namespaceOpt: Option[String] = Some("SYSTEM") override val db = new OracleDatabaseOnDocker - override val defaultMetadata: Metadata = new MetadataBuilder() + override def defaultMetadata(dataType: DataType): Metadata = new MetadataBuilder() .putLong("scale", 0) .putBoolean("isTimestampNTZ", false) + .putBoolean("isSigned", dataType.isInstanceOf[NumericType] || dataType.isInstanceOf[StringType]) .putString(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY, "varchar(255)") .build() @@ -101,11 +102,13 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTes override def testUpdateColumnType(tbl: String): Unit = { sql(s"CREATE TABLE $tbl (ID INTEGER)") var t = spark.table(tbl) - var expectedSchema = new StructType().add("ID", DecimalType(10, 0), true, super.defaultMetadata) + var expectedSchema = new StructType() + .add("ID", DecimalType(10, 0), true, super.defaultMetadata(DecimalType(10, 0))) assert(t.schema === expectedSchema) sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE LONG") t = spark.table(tbl) - expectedSchema = new StructType().add("ID", DecimalType(19, 0), true, super.defaultMetadata) + expectedSchema = new StructType() + .add("ID", DecimalType(19, 0), true, super.defaultMetadata(DecimalType(19, 0))) assert(t.schema === expectedSchema) // Update column type from LONG to INTEGER val sql1 = s"ALTER TABLE $tbl ALTER COLUMN id TYPE INTEGER" diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala index 233a634cac67..1f09c2fd3fc5 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala @@ -64,11 +64,13 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT override def testUpdateColumnType(tbl: String): Unit = { sql(s"CREATE TABLE $tbl (ID INTEGER)") var t = spark.table(tbl) - var expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata) + var expectedSchema = new StructType() + .add("ID", IntegerType, true, defaultMetadata(IntegerType)) assert(t.schema === expectedSchema) sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE STRING") t = spark.table(tbl) - expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata) + expectedSchema = new StructType() + .add("ID", StringType, true, defaultMetadata()) assert(t.schema === expectedSchema) // Update column type from STRING to INTEGER val sql1 = s"ALTER TABLE $tbl ALTER COLUMN id TYPE INTEGER" @@ -91,7 +93,8 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT sql(s"CREATE TABLE $tbl (ID INT)" + s" TBLPROPERTIES('TABLESPACE'='pg_default')") val t = spark.table(tbl) - val expectedSchema = new StructType().add("ID", IntegerType, true, defaultMetadata) + val expectedSchema = new StructType() + .add("ID", IntegerType, true, defaultMetadata(IntegerType)) assert(t.schema === expectedSchema) } diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala index 8dd377f4a35f..c80fbfc748dd 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala @@ -49,9 +49,10 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu def notSupportsTableComment: Boolean = false - def defaultMetadata: Metadata = new MetadataBuilder() + def defaultMetadata(dataType: DataType = StringType): Metadata = new MetadataBuilder() .putLong("scale", 0) .putBoolean("isTimestampNTZ", false) + .putBoolean("isSigned", dataType.isInstanceOf[NumericType]) .build() def testUpdateColumnNullability(tbl: String): Unit = { @@ -59,11 +60,11 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu var t = spark.table(s"$catalogName.alt_table") // nullable is true in the expectedSchema because Spark always sets nullable to true // regardless of the JDBC metadata https://github.com/apache/spark/pull/18445 - var expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata) + var expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata()) assert(t.schema === expectedSchema) sql(s"ALTER TABLE $catalogName.alt_table ALTER COLUMN ID DROP NOT NULL") t = spark.table(s"$catalogName.alt_table") - expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata) + expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata()) assert(t.schema === expectedSchema) // Update nullability of not existing column val msg = intercept[AnalysisException] { @@ -75,8 +76,9 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu def testRenameColumn(tbl: String): Unit = { sql(s"ALTER TABLE $tbl RENAME COLUMN ID TO RENAMED") val t = spark.table(s"$tbl") - val expectedSchema = new StructType().add("RENAMED", StringType, true, defaultMetadata) - .add("ID1", StringType, true, defaultMetadata).add("ID2", StringType, true, defaultMetadata) + val expectedSchema = new StructType().add("RENAMED", StringType, true, defaultMetadata()) + .add("ID1", StringType, true, defaultMetadata()) + .add("ID2", StringType, true, defaultMetadata()) assert(t.schema === expectedSchema) } @@ -86,16 +88,19 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu withTable(s"$catalogName.alt_table") { sql(s"CREATE TABLE $catalogName.alt_table (ID STRING)") var t = spark.table(s"$catalogName.alt_table") - var expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata) + var expectedSchema = new StructType() + .add("ID", StringType, true, defaultMetadata()) assert(t.schema === expectedSchema) sql(s"ALTER TABLE $catalogName.alt_table ADD COLUMNS (C1 STRING, C2 STRING)") t = spark.table(s"$catalogName.alt_table") - expectedSchema = expectedSchema.add("C1", StringType, true, defaultMetadata) - .add("C2", StringType, true, defaultMetadata) + expectedSchema = expectedSchema + .add("C1", StringType, true, defaultMetadata()) + .add("C2", StringType, true, defaultMetadata()) assert(t.schema === expectedSchema) sql(s"ALTER TABLE $catalogName.alt_table ADD COLUMNS (C3 STRING)") t = spark.table(s"$catalogName.alt_table") - expectedSchema = expectedSchema.add("C3", StringType, true, defaultMetadata) + expectedSchema = expectedSchema + .add("C3", StringType, true, defaultMetadata()) assert(t.schema === expectedSchema) // Add already existing column checkError( @@ -128,7 +133,8 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu sql(s"ALTER TABLE $catalogName.alt_table DROP COLUMN C1") sql(s"ALTER TABLE $catalogName.alt_table DROP COLUMN c3") val t = spark.table(s"$catalogName.alt_table") - val expectedSchema = new StructType().add("C2", StringType, true, defaultMetadata) + val expectedSchema = new StructType() + .add("C2", StringType, true, defaultMetadata()) assert(t.schema === expectedSchema) // Drop not existing column val msg = intercept[AnalysisException] { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index bc88ab9bfcae..84d87f008217 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -274,6 +274,7 @@ object JdbcUtils extends Logging with SQLConfHelper { val fields = new Array[StructField](ncols) var i = 0 while (i < ncols) { + val metadata = new MetadataBuilder() val columnName = rsmd.getColumnLabel(i + 1) val dataType = rsmd.getColumnType(i + 1) val typeName = rsmd.getColumnTypeName(i + 1) @@ -294,8 +295,6 @@ object JdbcUtils extends Logging with SQLConfHelper { } else { rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls } - val metadata = new MetadataBuilder() - metadata.putLong("scale", fieldScale) dataType match { case java.sql.Types.TIME => @@ -307,7 +306,9 @@ object JdbcUtils extends Logging with SQLConfHelper { metadata.putBoolean("rowid", true) case _ => } + metadata.putBoolean("isSigned", isSigned) metadata.putBoolean("isTimestampNTZ", isTimestampNTZ) + metadata.putLong("scale", fieldScale) val columnType = dialect.getCatalystType(dataType, typeName, fieldSize, metadata).getOrElse( getCatalystType(dataType, typeName, fieldSize, fieldScale, isSigned, isTimestampNTZ)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index 42b1a3a2854e..4e5f092b193c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.connector.catalog.index.TableIndex import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, NamedReference, NullOrdering, SortDirection} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} -import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, FloatType, LongType, MetadataBuilder, StringType, TimestampType} +import org.apache.spark.sql.types._ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { @@ -107,8 +107,12 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { // Some MySQL JDBC drivers converts JSON type into Types.VARCHAR with a precision of -1. // Explicitly converts it into StringType here. Some(StringType) - case Types.TINYINT if "TINYINT".equalsIgnoreCase(typeName) => - Some(ByteType) + case Types.TINYINT => + if (md.build().getBoolean("isSigned")) { + Some(ByteType) + } else { + Some(ShortType) + } case Types.TIMESTAMP if "DATETIME".equalsIgnoreCase(typeName) => // scalastyle:off line.size.limit // In MYSQL, DATETIME is TIMESTAMP WITHOUT TIME ZONE diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala index fc313de6c8fe..f4e7921e88bc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala @@ -37,9 +37,11 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { val tempDir = Utils.createTempDir() val url = s"jdbc:h2:${tempDir.getCanonicalPath};user=testUser;password=testPass" - val defaultMetadata = new MetadataBuilder() + + def defaultMetadata(dataType: DataType): Metadata = new MetadataBuilder() .putLong("scale", 0) .putBoolean("isTimestampNTZ", false) + .putBoolean("isSigned", dataType.isInstanceOf[NumericType]) .build() override def sparkConf: SparkConf = super.sparkConf @@ -142,8 +144,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { test("load a table") { val t = spark.table("h2.test.people") val expectedSchema = new StructType() - .add("NAME", VarcharType(32), true, defaultMetadata) - .add("ID", IntegerType, true, defaultMetadata) + .add("NAME", VarcharType(32), true, defaultMetadata(VarcharType(32))) + .add("ID", IntegerType, true, defaultMetadata(IntegerType)) assert(t.schema === CharVarcharUtils.replaceCharVarcharWithStringInSchema(expectedSchema)) Seq( "h2.test.not_existing_table" -> "`h2`.`test`.`not_existing_table`", @@ -185,13 +187,13 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { sql(s"ALTER TABLE $tableName ADD COLUMNS (C1 INTEGER, C2 STRING)") var t = spark.table(tableName) var expectedSchema = new StructType() - .add("ID", IntegerType, true, defaultMetadata) - .add("C1", IntegerType, true, defaultMetadata) - .add("C2", StringType, true, defaultMetadata) + .add("ID", IntegerType, true, defaultMetadata(IntegerType)) + .add("C1", IntegerType, true, defaultMetadata(IntegerType)) + .add("C2", StringType, true, defaultMetadata(StringType)) assert(t.schema === expectedSchema) sql(s"ALTER TABLE $tableName ADD COLUMNS (c3 DOUBLE)") t = spark.table(tableName) - expectedSchema = expectedSchema.add("c3", DoubleType, true, defaultMetadata) + expectedSchema = expectedSchema.add("c3", DoubleType, true, defaultMetadata(DoubleType)) assert(t.schema === expectedSchema) // Add already existing column checkError( @@ -229,8 +231,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { sql(s"ALTER TABLE $tableName RENAME COLUMN id TO C") val t = spark.table(tableName) val expectedSchema = new StructType() - .add("C", IntegerType, true, defaultMetadata) - .add("C0", IntegerType, true, defaultMetadata) + .add("C", IntegerType, true, defaultMetadata(IntegerType)) + .add("C0", IntegerType, true, defaultMetadata(IntegerType)) assert(t.schema === expectedSchema) // Rename to already existing column checkError( @@ -268,7 +270,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { sql(s"ALTER TABLE $tableName DROP COLUMN C1") sql(s"ALTER TABLE $tableName DROP COLUMN c3") val t = spark.table(tableName) - val expectedSchema = new StructType().add("C2", IntegerType, true, defaultMetadata) + val expectedSchema = new StructType() + .add("C2", IntegerType, true, defaultMetadata(IntegerType)) assert(t.schema === expectedSchema) // Drop not existing column val sqlText = s"ALTER TABLE $tableName DROP COLUMN bad_column" @@ -307,8 +310,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { sql(s"ALTER TABLE $tableName ALTER COLUMN deptno TYPE DOUBLE") val t = spark.table(tableName) val expectedSchema = new StructType() - .add("ID", DoubleType, true, defaultMetadata) - .add("deptno", DoubleType, true, defaultMetadata) + .add("ID", DoubleType, true, defaultMetadata(DoubleType)) + .add("deptno", DoubleType, true, defaultMetadata(DoubleType)) assert(t.schema === expectedSchema) // Update not existing column val sqlText = s"ALTER TABLE $tableName ALTER COLUMN bad_column TYPE DOUBLE" @@ -356,8 +359,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { sql(s"ALTER TABLE $tableName ALTER COLUMN deptno DROP NOT NULL") val t = spark.table(tableName) val expectedSchema = new StructType() - .add("ID", IntegerType, true, defaultMetadata) - .add("deptno", IntegerType, true, defaultMetadata) + .add("ID", IntegerType, true, defaultMetadata(IntegerType)) + .add("deptno", IntegerType, true, defaultMetadata(IntegerType)) assert(t.schema === expectedSchema) // Update nullability of not existing column val sqlText = s"ALTER TABLE $tableName ALTER COLUMN bad_column DROP NOT NULL" @@ -491,8 +494,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { sql(s"CREATE TABLE $tableName (c1 INTEGER NOT NULL, c2 INTEGER)") var t = spark.table(tableName) var expectedSchema = new StructType() - .add("c1", IntegerType, true, defaultMetadata) - .add("c2", IntegerType, true, defaultMetadata) + .add("c1", IntegerType, true, defaultMetadata(IntegerType)) + .add("c2", IntegerType, true, defaultMetadata(IntegerType)) assert(t.schema === expectedSchema) withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { @@ -516,8 +519,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { sql(s"ALTER TABLE $tableName RENAME COLUMN C2 TO c3") expectedSchema = new StructType() - .add("c1", IntegerType, true, defaultMetadata) - .add("c3", IntegerType, true, defaultMetadata) + .add("c1", IntegerType, true, defaultMetadata(IntegerType)) + .add("c3", IntegerType, true, defaultMetadata(IntegerType)) t = spark.table(tableName) assert(t.schema === expectedSchema) } @@ -542,7 +545,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { sql(s"ALTER TABLE $tableName DROP COLUMN C3") - expectedSchema = new StructType().add("c1", IntegerType, true, defaultMetadata) + expectedSchema = new StructType() + .add("c1", IntegerType, true, defaultMetadata(IntegerType)) t = spark.table(tableName) assert(t.schema === expectedSchema) } @@ -566,7 +570,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { sql(s"ALTER TABLE $tableName ALTER COLUMN C1 TYPE DOUBLE") - expectedSchema = new StructType().add("c1", DoubleType, true, defaultMetadata) + expectedSchema = new StructType() + .add("c1", DoubleType, true, defaultMetadata(DoubleType)) t = spark.table(tableName) assert(t.schema === expectedSchema) } @@ -590,7 +595,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { sql(s"ALTER TABLE $tableName ALTER COLUMN C1 DROP NOT NULL") - expectedSchema = new StructType().add("c1", DoubleType, true, defaultMetadata) + expectedSchema = new StructType() + .add("c1", DoubleType, true, defaultMetadata(IntegerType)) t = spark.table(tableName) assert(t.schema === expectedSchema) } @@ -660,8 +666,8 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { sql(s"ALTER TABLE $tableName ALTER COLUMN deptno TYPE VARCHAR(30)") val t = spark.table(tableName) val expected = new StructType() - .add("ID", CharType(10), true, defaultMetadata) - .add("deptno", VarcharType(30), true, defaultMetadata) + .add("ID", CharType(10), true, defaultMetadata(CharType(10))) + .add("deptno", VarcharType(30), true, defaultMetadata(VarcharType(30))) val replaced = CharVarcharUtils.replaceCharVarcharWithStringInSchema(expected) assert(t.schema === replaced) } @@ -674,13 +680,13 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { .executeUpdate()) withSQLConf(SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key -> "true") { val expected = new StructType() - .add("ID", StringType, true, defaultMetadata) - .add("DEPTNO", StringType, true, defaultMetadata) + .add("ID", StringType, true, defaultMetadata(StringType)) + .add("DEPTNO", StringType, true, defaultMetadata(StringType)) assert(sql(s"SELECT * FROM h2.test.char_tbl").schema === expected) } val expected = new StructType() - .add("ID", CharType(5), true, defaultMetadata) - .add("DEPTNO", VarcharType(10), true, defaultMetadata) + .add("ID", CharType(5), true, defaultMetadata(CharType(5))) + .add("DEPTNO", VarcharType(10), true, defaultMetadata(VarcharType(10))) val replaced = CharVarcharUtils.replaceCharVarcharWithStringInSchema(expected) assert(sql(s"SELECT * FROM h2.test.char_tbl").schema === replaced) } finally { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 9e8df6d733e0..a2dac5a9e1e9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -78,9 +78,10 @@ class JDBCSuite extends QueryTest with SharedSparkSession { } } - val defaultMetadata = new MetadataBuilder() + def defaultMetadata(dataType: DataType): Metadata = new MetadataBuilder() .putLong("scale", 0) .putBoolean("isTimestampNTZ", false) + .putBoolean("isSigned", dataType.isInstanceOf[NumericType]) .build() override def beforeAll(): Unit = { @@ -928,7 +929,7 @@ class JDBCSuite extends QueryTest with SharedSparkSession { test("MySQLDialect catalyst type mapping") { val mySqlDialect = JdbcDialects.get("jdbc:mysql") - val metadata = new MetadataBuilder() + val metadata = new MetadataBuilder().putBoolean("isSigned", value = true) assert(mySqlDialect.getCatalystType(java.sql.Types.VARBINARY, "BIT", 2, metadata) == Some(LongType)) assert(metadata.build().contains("binarylong")) @@ -937,6 +938,9 @@ class JDBCSuite extends QueryTest with SharedSparkSession { Some(BooleanType)) assert(mySqlDialect.getCatalystType(java.sql.Types.TINYINT, "TINYINT", 1, metadata) == Some(ByteType)) + metadata.putBoolean("isSigned", value = false) + assert(mySqlDialect.getCatalystType(java.sql.Types.TINYINT, "TINYINT", 1, metadata) === + Some(ShortType)) } test("SPARK-35446: MySQLDialect type mapping of float") { @@ -1386,8 +1390,8 @@ class JDBCSuite extends QueryTest with SharedSparkSession { } test("SPARK-16848: jdbc API throws an exception for user specified schema") { - val schema = StructType(Seq(StructField("name", StringType, false, defaultMetadata), - StructField("theid", IntegerType, false, defaultMetadata))) + val schema = StructType(Seq(StructField("name", StringType, false, defaultMetadata(StringType)), + StructField("theid", IntegerType, false, defaultMetadata(IntegerType)))) val parts = Array[String]("THEID < 2", "THEID >= 2") val e1 = intercept[AnalysisException] { spark.read.schema(schema).jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, new Properties()) @@ -1407,8 +1411,9 @@ class JDBCSuite extends QueryTest with SharedSparkSession { props.put("customSchema", customSchema) val df = spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, props) assert(df.schema.size === 2) - val expectedSchema = new StructType(CatalystSqlParser.parseTableSchema(customSchema).map( - f => StructField(f.name, f.dataType, f.nullable, defaultMetadata)).toArray) + val structType = CatalystSqlParser.parseTableSchema(customSchema) + val expectedSchema = new StructType(structType.map( + f => StructField(f.name, f.dataType, f.nullable, defaultMetadata(f.dataType))).toArray) assert(df.schema === CharVarcharUtils.replaceCharVarcharWithStringInSchema(expectedSchema)) assert(df.count() === 3) } @@ -1426,7 +1431,7 @@ class JDBCSuite extends QueryTest with SharedSparkSession { val df = sql("select * from people_view") assert(df.schema.length === 2) val expectedSchema = new StructType(CatalystSqlParser.parseTableSchema(customSchema) - .map(f => StructField(f.name, f.dataType, f.nullable, defaultMetadata)).toArray) + .map(f => StructField(f.name, f.dataType, f.nullable, defaultMetadata(f.dataType))).toArray) assert(df.schema === CharVarcharUtils.replaceCharVarcharWithStringInSchema(expectedSchema)) assert(df.count() === 3) @@ -1577,8 +1582,9 @@ class JDBCSuite extends QueryTest with SharedSparkSession { } test("jdbc data source shouldn't have unnecessary metadata in its schema") { - var schema = StructType(Seq(StructField("NAME", VarcharType(32), true, defaultMetadata), - StructField("THEID", IntegerType, true, defaultMetadata))) + var schema = StructType( + Seq(StructField("NAME", VarcharType(32), true, defaultMetadata(VarcharType(32))), + StructField("THEID", IntegerType, true, defaultMetadata(IntegerType)))) schema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(schema) val df = spark.read.format("jdbc") .option("Url", urlWithUserAndPass) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org