This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new e31d0726a7b [SPARK-38846][SQL] Add explicit data mapping between Teradata Numeric Type and Spark DecimalType e31d0726a7b is described below commit e31d0726a7baae3ff030ace25d9e2e1bfb1a7da6 Author: Eugene-Mark <eugene.ma.twe...@gmail.com> AuthorDate: Mon Jun 20 18:10:44 2022 -0500 [SPARK-38846][SQL] Add explicit data mapping between Teradata Numeric Type and Spark DecimalType ### What changes were proposed in this pull request? - Implemented getCatalystType method in TeradataDialect - Handle Types.NUMERIC explicitly ### Why are the changes needed? Load table from Teradata, if the type of column in Teradata is `Number`, it will be converted to `DecimalType(38,0)` which will lose the fractional part of original data. ### Does this PR introduce _any_ user-facing change? Yes, it will convert Number type to DecimalType(38,18) if the scale is 0, so that keep the fractional part in some way. ### How was this patch tested? UT is added to JDBCSuite.scala. Closes #36499 from Eugene-Mark/teradata-loading. Lead-authored-by: Eugene-Mark <eugene.ma.twe...@gmail.com> Co-authored-by: Eugene <eugene...@intel.com> Co-authored-by: Eugene <eugene.ma.twe...@gmail.com> Signed-off-by: Sean Owen <sro...@gmail.com> --- docs/sql-migration-guide.md | 4 ++++ .../org/apache/spark/sql/types/DecimalType.scala | 3 ++- .../apache/spark/sql/jdbc/TeradataDialect.scala | 28 ++++++++++++++++++++++ .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 27 ++++++++++++++++++++- 4 files changed, 60 insertions(+), 2 deletions(-) diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index 43ad780db08..ab0a7af7bf1 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -22,6 +22,10 @@ license: | * Table of contents {:toc} +## Upgrading from Spark SQL 3.3 to 3.4 + + - Since Spark 3.4, Number or Number(\*) from Teradata will be treated as Decimal(38,18). In Spark 3.3 or earlier, Number or Number(\*) from Teradata will be treated as Decimal(38, 0), in which case the fractional part will be removed. + ## Upgrading from Spark SQL 3.2 to 3.3 - Since Spark 3.3, the `histogram_numeric` function in Spark SQL returns an output type of an array of structs (x, y), where the type of the 'x' field in the return value is propagated from the input values consumed in the aggregate function. In Spark 3.2 or earlier, 'x' always had double type. Optionally, use the configuration `spark.sql.legacy.histogramNumericPropagateInputType` since Spark 3.3 to revert back to the previous behavior. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala index 08ddd12ef7d..ce325024c3f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala @@ -126,7 +126,8 @@ object DecimalType extends AbstractDataType { val MAX_PRECISION = 38 val MAX_SCALE = 38 - val SYSTEM_DEFAULT: DecimalType = DecimalType(MAX_PRECISION, 18) + val DEFAULT_SCALE = 18 + val SYSTEM_DEFAULT: DecimalType = DecimalType(MAX_PRECISION, DEFAULT_SCALE) val USER_DEFAULT: DecimalType = DecimalType(10, 0) val MINIMUM_ADJUSTED_SCALE = 6 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala index 79fb710cf03..2b2d1fb7e86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.jdbc +import java.sql.Types import java.util.Locale import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, GeneralAggregateFunc} @@ -96,4 +97,31 @@ private case object TeradataDialect extends JdbcDialect { override def getLimitClause(limit: Integer): String = { "" } + + override def getCatalystType( + sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = { + sqlType match { + case Types.NUMERIC => + if (md == null) { + Some(DecimalType.SYSTEM_DEFAULT) + } else { + val scale = md.build().getLong("scale") + // In Teradata, define Number without parameter means precision and scale is flexible. + // However, in this case, the scale returned from JDBC is 0, which will lead to + // fractional part loss. And the precision returned from JDBC is 40, which conflicts to + // DecimalType.MAX_PRECISION. + // Handle this special case by adding explicit conversion to system default decimal type. + if (size == 40) { + if (scale == 0) Some(DecimalType.SYSTEM_DEFAULT) + // In Teradata, Number(*, scale) is valid but in this case, the precision + // returned from JDBC is also 40, which conflicts to DecimalType.MAX_PRECISION. + else Some(DecimalType(DecimalType.MAX_PRECISION, scale.toInt)) + } else { + // Normal case, Number(precision, scale) is explicitly set in Teradata + Some(DecimalType(size, scale.toInt)) + } + } + case _ => None + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index a07ef5ecd30..68bd189eef5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -1356,7 +1356,32 @@ class JDBCSuite extends QueryTest map(_.databaseTypeDefinition).get == "CHAR(1)") } - test("Checking metrics correctness with JDBC") { + test("SPARK-38846: TeradataDialect catalyst type mapping") { + val teradataDialect = JdbcDialects.get("jdbc:teradata") + val metadata = new MetadataBuilder().putString("name", "test_column").putLong("scale", 0) + // When Number(*)/Number is specified, default DecimalType should be returned + val flexiblePrecision = 40 + assert(teradataDialect.getCatalystType(java.sql.Types.NUMERIC, "NUMBER", + flexiblePrecision, metadata) == Some(DecimalType.SYSTEM_DEFAULT)) + val specifiedScale = 10 + val specifiedPrecision = 10 + metadata.putLong("scale", specifiedScale) + // Both precision and scale is set explicitly + assert(teradataDialect.getCatalystType(java.sql.Types.NUMERIC, "NUMBER", + specifiedPrecision, metadata) == Some(DecimalType(specifiedPrecision, specifiedScale))) + // When precision is not specified, MAX_PRECISION should be used + assert(teradataDialect.getCatalystType(java.sql.Types.NUMERIC, "NUMBER", + flexiblePrecision, metadata) == Some(DecimalType(DecimalType.MAX_PRECISION, specifiedScale))) + // When precision and scale is set explicitly and scale is 0 + metadata.putLong("scale", 0) + assert(teradataDialect.getCatalystType(java.sql.Types.NUMERIC, "NUMBER", + specifiedPrecision, metadata) == Some(DecimalType(specifiedPrecision, 0))) + // When MetadataBuilder is null, default DecimalType should be returned + assert(teradataDialect.getCatalystType(java.sql.Types.NUMERIC, "NUMBER", + specifiedPrecision, null) == Some(DecimalType.SYSTEM_DEFAULT)) + } + + test("Checking metrics correctness with JDBC") { val foobarCnt = spark.table("foobar").count() val res = InputOutputMetricsHelper.run(sql("SELECT * FROM foobar").toDF()) assert(res === (foobarCnt, 0L, foobarCnt) :: Nil) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org