This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new 99eb3ff [SPARK-36227][SQL][3.2] Remove TimestampNTZ type support in Spark 3.2 99eb3ff is described below commit 99eb3ff226dd62691211db0ed9184195e702e20c Author: Gengliang Wang <gengli...@apache.org> AuthorDate: Wed Jul 21 09:55:09 2021 -0700 [SPARK-36227][SQL][3.2] Remove TimestampNTZ type support in Spark 3.2 ### What changes were proposed in this pull request? Remove TimestampNTZ type support in the production code of Spark 3.2. To archive the goal, this PR adds the check "Utils.isTesting" in the following code branches: - keyword "timestamp_ntz" and "timestamp_ltz" in parser - New expressions from https://issues.apache.org/jira/browse/SPARK-35662 - Using java.time.localDateTime as the external type for TimestampNTZType - `SQLConf.timestampType` which determines the default timestamp type of Spark SQL. This is to minimize the code difference between the master branch. So that future users won't think TimestampNTZ is already available in Spark 3.2. The downside is that users can still find TimestampNTZType under package `org.apache.spark.sql.types`. There should be nothing left other than this. ### Why are the changes needed? As of now, there are some blockers for delivering the TimestampNTZ project in Spark 3.2: - In the Hive Thrift server, both TimestampType and TimestampNTZType are mapped to the same timestamp type, which can cause confusion for users. - For the Parquet data source, the new written TimestampNTZType Parquet columns will be read as TimestampType in old Spark releases. Also, we need to decide the merge schema for files mixed with TimestampType and TimestampNTZ type. - The type coercion rules for TimestampNTZType are incomplete. For example, what should the data type of the in clause "IN(Timestamp'2020-01-01 00:00:00', TimestampNtz'2020-01-01 00:00:00') be. - It is tricky to support TimestampNTZType in JSON/CSV data readers. We need to avoid regressions as possible as we can. There are 10 days left for the expected 3.2 RC date. So, I propose to **release the TimestampNTZ type in Spark 3.3 instead of Spark 3.2**. So that we have enough time to make considerate designs for the issues. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing Unit tests + manual tests from spark-shell to validate the changes are gone. New functions ``` spark.sql("select to_timestamp_ntz'2021-01-01 00:00:00'").show() spark.sql("select to_timestamp_ltz'2021-01-01 00:00:00'").show() spark.sql("select make_timestamp_ntz(1,1,1,1,1,1)").show() spark.sql("select make_timestamp_ltz(1,1,1,1,1,1)").show() spark.sql("select localtimestamp()").show() ``` The SQL configuration `spark.sql.timestampType` should not work in 3.2 ``` spark.conf.set("spark.sql.timestampType", "TIMESTAMP_NTZ") spark.sql("select make_timestamp(1,1,1,1,1,1)").schema spark.sql("select to_timestamp('2021-01-01 00:00:00')").schema spark.sql("select timestamp'2021-01-01 00:00:00'").schema Seq((1, java.sql.Timestamp.valueOf("2021-01-01 00:00:00"))).toDF("i", "ts").write.partitionBy("ts").parquet("/tmp/test") spark.read.parquet("/tmp/test").schema ``` LocalDateTime is not supported as a built-in external type: ``` Seq(LocalDateTime.now()).toDF() org.apache.spark.sql.catalyst.expressions.Literal(java.time.LocalDateTime.now()) org.apache.spark.sql.catalyst.expressions.Literal(0L, TimestampNTZType) ``` Closes #33444 from gengliangwang/banNTZ. Authored-by: Gengliang Wang <gengli...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../main/scala/org/apache/spark/sql/Encoders.scala | 8 ------- .../sql/catalyst/CatalystTypeConverters.scala | 4 +++- .../spark/sql/catalyst/JavaTypeInference.scala | 11 ++++++--- .../spark/sql/catalyst/ScalaReflection.scala | 12 +++++++--- .../sql/catalyst/analysis/FunctionRegistry.scala | 22 ++++++++++++----- .../apache/spark/sql/catalyst/dsl/package.scala | 4 ---- .../spark/sql/catalyst/encoders/RowEncoder.scala | 10 +++++--- .../spark/sql/catalyst/expressions/literals.scala | 14 +++++++---- .../spark/sql/catalyst/parser/AstBuilder.scala | 11 +++++---- .../org/apache/spark/sql/internal/SQLConf.scala | 15 +++++++----- .../scala/org/apache/spark/sql/SQLImplicits.scala | 3 --- .../org/apache/spark/sql/JavaDatasetSuite.java | 8 ------- .../apache/spark/sql/DataFrameAggregateSuite.scala | 13 +--------- .../scala/org/apache/spark/sql/DatasetSuite.scala | 5 ---- .../test/scala/org/apache/spark/sql/UDFSuite.scala | 28 ---------------------- 15 files changed, 70 insertions(+), 98 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala index f23f3c6..e689484 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala @@ -115,14 +115,6 @@ object Encoders { def LOCALDATE: Encoder[java.time.LocalDate] = ExpressionEncoder() /** - * Creates an encoder that serializes instances of the `java.time.LocalDateTime` class - * to the internal representation of nullable Catalyst's TimestampNTZType. - * - * @since 3.2.0 - */ - def LOCALDATETIME: Encoder[java.time.LocalDateTime] = ExpressionEncoder() - - /** * An encoder for nullable timestamp type. * * @since 1.6.0 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala index f3b8e69..d929897 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.types.DayTimeIntervalType._ import org.apache.spark.sql.types.YearMonthIntervalType._ import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.Utils /** * Functions to convert Scala types to Catalyst types and vice versa. @@ -511,7 +512,8 @@ object CatalystTypeConverters { case ld: LocalDate => LocalDateConverter.toCatalyst(ld) case t: Timestamp => TimestampConverter.toCatalyst(t) case i: Instant => InstantConverter.toCatalyst(i) - case l: LocalDateTime => TimestampNTZConverter.toCatalyst(l) + // SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal code changes. + case l: LocalDateTime if Utils.isTesting => TimestampNTZConverter.toCatalyst(l) case d: BigDecimal => new DecimalConverter(DecimalType(d.precision, d.scale)).toCatalyst(d) case d: JavaBigDecimal => new DecimalConverter(DecimalType(d.precision, d.scale)).toCatalyst(d) case seq: Seq[Any] => new GenericArrayData(seq.map(convertToCatalyst).toArray) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala index 903072a..274b74b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.objects._ import org.apache.spark.sql.catalyst.util.ArrayBasedMapData import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils /** * Type-inference utilities for POJOs and Java collections. @@ -119,7 +120,9 @@ object JavaTypeInference { case c: Class[_] if c == classOf[java.sql.Date] => (DateType, true) case c: Class[_] if c == classOf[java.time.Instant] => (TimestampType, true) case c: Class[_] if c == classOf[java.sql.Timestamp] => (TimestampType, true) - case c: Class[_] if c == classOf[java.time.LocalDateTime] => (TimestampNTZType, true) + // SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal code changes. + case c: Class[_] if c == classOf[java.time.LocalDateTime] && Utils.isTesting => + (TimestampNTZType, true) case c: Class[_] if c == classOf[java.time.Duration] => (DayTimeIntervalType(), true) case c: Class[_] if c == classOf[java.time.Period] => (YearMonthIntervalType(), true) @@ -251,7 +254,8 @@ object JavaTypeInference { case c if c == classOf[java.sql.Timestamp] => createDeserializerForSqlTimestamp(path) - case c if c == classOf[java.time.LocalDateTime] => + // SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal code changes. + case c if c == classOf[java.time.LocalDateTime] && Utils.isTesting => createDeserializerForLocalDateTime(path) case c if c == classOf[java.time.Duration] => @@ -413,7 +417,8 @@ object JavaTypeInference { case c if c == classOf[java.sql.Timestamp] => createSerializerForSqlTimestamp(inputObject) - case c if c == classOf[java.time.LocalDateTime] => + // SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal code changes. + case c if c == classOf[java.time.LocalDateTime] && Utils.isTesting => createSerializerForLocalDateTime(inputObject) case c if c == classOf[java.time.LocalDate] => createSerializerForJavaLocalDate(inputObject) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index 4de7e5c..b4761f6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} +import org.apache.spark.util.Utils /** @@ -752,7 +753,8 @@ object ScalaReflection extends ScalaReflection { Schema(TimestampType, nullable = true) case t if isSubtype(t, localTypeOf[java.sql.Timestamp]) => Schema(TimestampType, nullable = true) - case t if isSubtype(t, localTypeOf[java.time.LocalDateTime]) => + // SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal code changes. + case t if isSubtype(t, localTypeOf[java.time.LocalDateTime]) && Utils.isTesting => Schema(TimestampNTZType, nullable = true) case t if isSubtype(t, localTypeOf[java.time.LocalDate]) => Schema(DateType, nullable = true) case t if isSubtype(t, localTypeOf[java.sql.Date]) => Schema(DateType, nullable = true) @@ -858,7 +860,9 @@ object ScalaReflection extends ScalaReflection { StringType -> classOf[UTF8String], DateType -> classOf[DateType.InternalType], TimestampType -> classOf[TimestampType.InternalType], - TimestampNTZType -> classOf[TimestampNTZType.InternalType], + // SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal code changes. + TimestampNTZType -> + (if (Utils.isTesting) classOf[TimestampNTZType.InternalType] else classOf[java.lang.Object]), BinaryType -> classOf[BinaryType.InternalType], CalendarIntervalType -> classOf[CalendarInterval] ) @@ -873,7 +877,9 @@ object ScalaReflection extends ScalaReflection { DoubleType -> classOf[java.lang.Double], DateType -> classOf[java.lang.Integer], TimestampType -> classOf[java.lang.Long], - TimestampNTZType -> classOf[java.lang.Long] + // SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal code changes. + TimestampNTZType -> + (if (Utils.isTesting) classOf[java.lang.Long] else classOf[java.lang.Object]) ) def dataTypeJavaClass(dt: DataType): Class[_] = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 5fce4b6..63b1525 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Range} import org.apache.spark.sql.catalyst.trees.TreeNodeTag import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils /** @@ -317,6 +318,20 @@ object FunctionRegistry { val FUNC_ALIAS = TreeNodeTag[String]("functionAliasName") + val expressionsForTimestampNTZSupport: Map[String, (ExpressionInfo, FunctionBuilder)] = + // SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal code changes. + if (Utils.isTesting) { + Map( + expression[ParseToTimestampNTZ]("to_timestamp_ntz"), + expression[ParseToTimestampLTZ]("to_timestamp_ltz"), + expression[MakeTimestampNTZ]("make_timestamp_ntz"), + expression[MakeTimestampLTZ]("make_timestamp_ltz"), + expression[LocalTimestamp]("localtimestamp") + ) + } else { + Map.empty + } + // Note: Whenever we add a new entry here, make sure we also update ExpressionToSQLSuite val expressions: Map[String, (ExpressionInfo, FunctionBuilder)] = Map( // misc non-aggregate functions @@ -519,7 +534,6 @@ object FunctionRegistry { expression[CurrentDate]("current_date"), expression[CurrentTimestamp]("current_timestamp"), expression[CurrentTimeZone]("current_timezone"), - expression[LocalTimestamp]("localtimestamp"), expression[DateDiff]("datediff"), expression[DateAdd]("date_add"), expression[DateFormatClass]("date_format"), @@ -542,8 +556,6 @@ object FunctionRegistry { expression[ParseToDate]("to_date"), expression[ToUnixTimestamp]("to_unix_timestamp"), expression[ToUTCTimestamp]("to_utc_timestamp"), - expression[ParseToTimestampNTZ]("to_timestamp_ntz"), - expression[ParseToTimestampLTZ]("to_timestamp_ltz"), expression[TruncDate]("trunc"), expression[TruncTimestamp]("date_trunc"), expression[UnixTimestamp]("unix_timestamp"), @@ -555,8 +567,6 @@ object FunctionRegistry { expression[SessionWindow]("session_window"), expression[MakeDate]("make_date"), expression[MakeTimestamp]("make_timestamp"), - expression[MakeTimestampNTZ]("make_timestamp_ntz"), - expression[MakeTimestampLTZ]("make_timestamp_ltz"), expression[MakeInterval]("make_interval"), expression[MakeDTInterval]("make_dt_interval"), expression[MakeYMInterval]("make_ym_interval"), @@ -712,7 +722,7 @@ object FunctionRegistry { expression[CsvToStructs]("from_csv"), expression[SchemaOfCsv]("schema_of_csv"), expression[StructsToCsv]("to_csv") - ) + ) ++ expressionsForTimestampNTZSupport val builtin: SimpleFunctionRegistry = { val fr = new SimpleFunctionRegistry diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index d0f63ba..46be6b4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -297,10 +297,6 @@ package object dsl { /** Creates a new AttributeReference of type timestamp */ def timestamp: AttributeReference = AttributeReference(s, TimestampType, nullable = true)() - /** Creates a new AttributeReference of type timestamp without time zone */ - def timestampNTZ: AttributeReference = - AttributeReference(s, TimestampNTZType, nullable = true)() - /** Creates a new AttributeReference of the day-time interval type */ def dayTimeInterval(startField: Byte, endField: Byte): AttributeReference = { AttributeReference(s, DayTimeIntervalType(startField, endField), nullable = true)() diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala index d5cfa50..80477d9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils /** * A factory for constructing encoders that convert external row to/from the Spark SQL @@ -105,7 +106,8 @@ object RowEncoder { createSerializerForSqlTimestamp(inputObject) } - case TimestampNTZType => createSerializerForLocalDateTime(inputObject) + // SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal code changes. + case TimestampNTZType if Utils.isTesting => createSerializerForLocalDateTime(inputObject) case DateType => if (SQLConf.get.datetimeJava8ApiEnabled) { @@ -230,7 +232,8 @@ object RowEncoder { } else { ObjectType(classOf[java.sql.Timestamp]) } - case TimestampNTZType => + // SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal code changes. + case TimestampNTZType if Utils.isTesting => ObjectType(classOf[java.time.LocalDateTime]) case DateType => if (SQLConf.get.datetimeJava8ApiEnabled) { @@ -287,7 +290,8 @@ object RowEncoder { createDeserializerForSqlTimestamp(input) } - case TimestampNTZType => + // SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal code changes. + case TimestampNTZType if Utils.isTesting => createDeserializerForLocalDateTime(input) case DateType => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala index ee40909..6b398ae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala @@ -80,7 +80,9 @@ object Literal { case d: Decimal => Literal(d, DecimalType(Math.max(d.precision, d.scale), d.scale)) case i: Instant => Literal(instantToMicros(i), TimestampType) case t: Timestamp => Literal(DateTimeUtils.fromJavaTimestamp(t), TimestampType) - case l: LocalDateTime => Literal(DateTimeUtils.localDateTimeToMicros(l), TimestampNTZType) + // SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal code changes. + case l: LocalDateTime if Utils.isTesting => + Literal(DateTimeUtils.localDateTimeToMicros(l), TimestampNTZType) case ld: LocalDate => Literal(ld.toEpochDay.toInt, DateType) case d: Date => Literal(DateTimeUtils.fromJavaDate(d), DateType) case d: Duration => Literal(durationToMicros(d), DayTimeIntervalType()) @@ -120,7 +122,8 @@ object Literal { case _ if clz == classOf[Date] => DateType case _ if clz == classOf[Instant] => TimestampType case _ if clz == classOf[Timestamp] => TimestampType - case _ if clz == classOf[LocalDateTime] => TimestampNTZType + // SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal code changes. + case _ if clz == classOf[LocalDateTime] && Utils.isTesting => TimestampNTZType case _ if clz == classOf[Duration] => DayTimeIntervalType() case _ if clz == classOf[Period] => YearMonthIntervalType() case _ if clz == classOf[JavaBigDecimal] => DecimalType.SYSTEM_DEFAULT @@ -185,7 +188,8 @@ object Literal { case dt: DecimalType => Literal(Decimal(0, dt.precision, dt.scale)) case DateType => create(0, DateType) case TimestampType => create(0L, TimestampType) - case TimestampNTZType => create(0L, TimestampNTZType) + // SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal code changes. + case TimestampNTZType if Utils.isTesting => create(0L, TimestampNTZType) case it: DayTimeIntervalType => create(0L, it) case it: YearMonthIntervalType => create(0, it) case StringType => Literal("") @@ -207,7 +211,9 @@ object Literal { case ByteType => v.isInstanceOf[Byte] case ShortType => v.isInstanceOf[Short] case IntegerType | DateType | _: YearMonthIntervalType => v.isInstanceOf[Int] - case LongType | TimestampType | TimestampNTZType | _: DayTimeIntervalType => + // SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal code changes. + case TimestampNTZType if Utils.isTesting => v.isInstanceOf[Long] + case LongType | TimestampType | _: DayTimeIntervalType => v.isInstanceOf[Long] case FloatType => v.isInstanceOf[Float] case DoubleType => v.isInstanceOf[Double] diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index d213549..a431758 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -47,6 +47,7 @@ import org.apache.spark.sql.errors.QueryParsingErrors import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} +import org.apache.spark.util.Utils.isTesting import org.apache.spark.util.random.RandomSampler /** @@ -2131,10 +2132,11 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg val zoneId = getZoneId(conf.sessionLocalTimeZone) val specialDate = convertSpecialDate(value, zoneId).map(Literal(_, DateType)) specialDate.getOrElse(toLiteral(stringToDate, DateType)) - case "TIMESTAMP_NTZ" => + // SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal code changes. + case "TIMESTAMP_NTZ" if isTesting => val specialTs = convertSpecialTimestampNTZ(value).map(Literal(_, TimestampNTZType)) specialTs.getOrElse(toLiteral(stringToTimestampWithoutTimeZone, TimestampNTZType)) - case "TIMESTAMP_LTZ" => + case "TIMESTAMP_LTZ" if isTesting => constructTimestampLTZLiteral(value) case "TIMESTAMP" => SQLConf.get.timestampType match { @@ -2573,8 +2575,9 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg case ("double", Nil) => DoubleType case ("date", Nil) => DateType case ("timestamp", Nil) => SQLConf.get.timestampType - case ("timestamp_ntz", Nil) => TimestampNTZType - case ("timestamp_ltz", Nil) => TimestampType + // SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal code changes. + case ("timestamp_ntz", Nil) if isTesting => TimestampNTZType + case ("timestamp_ltz", Nil) if isTesting => TimestampType case ("string", Nil) => StringType case ("character" | "char", length :: Nil) => CharType(length.getText.toInt) case ("varchar", length :: Nil) => VarcharType(length.getText.toInt) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 14c4e55..b3557ec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2884,9 +2884,10 @@ object SQLConf { s"and type literal. Setting the configuration as ${TimestampTypes.TIMESTAMP_NTZ} will " + "use TIMESTAMP WITHOUT TIME ZONE as the default type while putting it as " + s"${TimestampTypes.TIMESTAMP_LTZ} will use TIMESTAMP WITH LOCAL TIME ZONE. " + - "Before the 3.2.0 release, Spark only supports the TIMESTAMP WITH " + + "Before the 3.3.0 release, Spark only supports the TIMESTAMP WITH " + "LOCAL TIME ZONE type.") - .version("3.2.0") + .version("3.3.0") + .internal() .stringConf .transform(_.toUpperCase(Locale.ROOT)) .checkValues(TimestampTypes.values.map(_.toString)) @@ -3975,12 +3976,14 @@ class SQLConf extends Serializable with Logging { def ansiEnabled: Boolean = getConf(ANSI_ENABLED) def timestampType: AtomicType = getConf(TIMESTAMP_TYPE) match { - case "TIMESTAMP_LTZ" => + // SPARK-36227: Remove TimestampNTZ type support in Spark 3.2 with minimal code changes. + // The configuration `TIMESTAMP_TYPE` is only effective for testing in Spark 3.2. + case "TIMESTAMP_NTZ" if Utils.isTesting => + TimestampNTZType + + case _ => // For historical reason, the TimestampType maps to TIMESTAMP WITH LOCAL TIME ZONE TimestampType - - case "TIMESTAMP_NTZ" => - TimestampNTZType } def nestedSchemaPruningEnabled: Boolean = getConf(NESTED_SCHEMA_PRUNING_ENABLED) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala index a3004ca..90188ca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala @@ -82,9 +82,6 @@ abstract class SQLImplicits extends LowPrioritySQLImplicits { /** @since 3.0.0 */ implicit def newLocalDateEncoder: Encoder[java.time.LocalDate] = Encoders.LOCALDATE - /** @since 3.2.0 */ - implicit def newLocalDateTimeEncoder: Encoder[java.time.LocalDateTime] = Encoders.LOCALDATETIME - /** @since 2.2.0 */ implicit def newTimeStampEncoder: Encoder[java.sql.Timestamp] = Encoders.TIMESTAMP diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java index 28439f2..9a31616 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java @@ -555,14 +555,6 @@ public class JavaDatasetSuite implements Serializable { } @Test - public void testLocalDateTimeEncoder() { - Encoder<LocalDateTime> encoder = Encoders.LOCALDATETIME(); - List<LocalDateTime> data = Arrays.asList(LocalDateTime.of(1, 1, 1, 1, 1)); - Dataset<LocalDateTime> ds = spark.createDataset(data, encoder); - Assert.assertEquals(data, ds.collectAsList()); - } - - @Test public void testDurationEncoder() { Encoder<Duration> encoder = Encoders.DURATION(); List<Duration> data = Arrays.asList(Duration.ofDays(0)); diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala index d0a122e..7a9f980 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql -import java.time.{Duration, LocalDateTime, Period} +import java.time.{Duration, Period} import scala.util.Random @@ -1416,17 +1416,6 @@ class DataFrameAggregateSuite extends QueryTest val df2 = Seq(Period.ofYears(1)).toDF("a").groupBy("a").count() checkAnswer(df2, Row(Period.ofYears(1), 1)) } - - test("SPARK-36054: Support group by TimestampNTZ column") { - val ts1 = "2021-01-01T00:00:00" - val ts2 = "2021-01-01T00:00:01" - val localDateTime = Seq(ts1, ts1, ts2).map(LocalDateTime.parse) - val df = localDateTime.toDF("ts").groupBy("ts").count().orderBy("ts") - val expectedSchema = - new StructType().add(StructField("ts", TimestampNTZType)).add("count", LongType, false) - assert (df.schema == expectedSchema) - checkAnswer(df, Seq(Row(LocalDateTime.parse(ts1), 2), Row(LocalDateTime.parse(ts2), 1))) - } } case class B(c: Option[Double]) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 5f71390..6706a1b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -2008,11 +2008,6 @@ class DatasetSuite extends QueryTest checkAnswer(withUDF, Row(Row(1), null, null) :: Row(Row(1), null, null) :: Nil) } - test("SPARK-35664: implicit encoder for java.time.LocalDateTime") { - val localDateTime = java.time.LocalDateTime.parse("2021-06-08T12:31:58.999999") - assert(Seq(localDateTime).toDS().head() === localDateTime) - } - test("SPARK-34605: implicit encoder for java.time.Duration") { val duration = java.time.Duration.ofMinutes(10) assert(spark.range(1).map { _ => duration }.head === duration) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index 9d32d6f..aad1060 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -847,34 +847,6 @@ class UDFSuite extends QueryTest with SharedSparkSession { } } - test("SPARK-35674: using java.time.LocalDateTime in UDF") { - // Regular case - val input = Seq(java.time.LocalDateTime.parse("2021-01-01T00:00:00")).toDF("dateTime") - val plusYear = udf((l: java.time.LocalDateTime) => l.plusYears(1)) - val result = input.select(plusYear($"dateTime").as("newDateTime")) - checkAnswer(result, Row(java.time.LocalDateTime.parse("2022-01-01T00:00:00")) :: Nil) - assert(result.schema === new StructType().add("newDateTime", TimestampNTZType)) - // UDF produces `null` - val nullFunc = udf((_: java.time.LocalDateTime) => null.asInstanceOf[java.time.LocalDateTime]) - val nullResult = input.select(nullFunc($"dateTime").as("nullDateTime")) - checkAnswer(nullResult, Row(null) :: Nil) - assert(nullResult.schema === new StructType().add("nullDateTime", TimestampNTZType)) - // Input parameter of UDF is null - val nullInput = Seq(null.asInstanceOf[java.time.LocalDateTime]).toDF("nullDateTime") - val constDuration = udf((_: java.time.LocalDateTime) => - java.time.LocalDateTime.parse("2021-01-01T00:00:00")) - val constResult = nullInput.select(constDuration($"nullDateTime").as("firstDayOf2021")) - checkAnswer(constResult, Row(java.time.LocalDateTime.parse("2021-01-01T00:00:00")) :: Nil) - assert(constResult.schema === new StructType().add("firstDayOf2021", TimestampNTZType)) - // Error in the conversion of UDF result to the internal representation of timestamp without - // time zone - val overflowFunc = udf((l: java.time.LocalDateTime) => l.plusDays(Long.MaxValue)) - val e = intercept[SparkException] { - input.select(overflowFunc($"dateTime")).collect() - }.getCause.getCause - assert(e.isInstanceOf[java.lang.ArithmeticException]) - } - test("SPARK-34663, SPARK-35730: using java.time.Duration in UDF") { // Regular case val input = Seq(java.time.Duration.ofHours(23)).toDF("d") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org