This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 4f36978 [SPARK-37360][SQL] Support TimestampNTZ in JSON data source 4f36978 is described below commit 4f369789bd5d6cc81a85fe01a37e0ae90cbdeb6c Author: Ivan Sadikov <ivan.sadi...@databricks.com> AuthorDate: Mon Dec 6 13:24:46 2021 +0500 [SPARK-37360][SQL] Support TimestampNTZ in JSON data source ### What changes were proposed in this pull request? This PR adds support for TimestampNTZ type in the JSON data source. Most of the functionality has already been added, this patch verifies that writes + reads work for TimestampNTZ type and adds schema inference depending on the timestamp value format written. The following applies: - If there is a mixture of `TIMESTAMP_NTZ` and `TIMESTAMP_LTZ` values, use `TIMESTAMP_LTZ`. - If there are only `TIMESTAMP_NTZ` values, resolve using the the default timestamp type configured with `spark.sql.timestampType`. In addition, I introduced a new JSON option `timestampNTZFormat` which is similar to `timestampFormat` but it allows to configure read/write pattern for `TIMESTAMP_NTZ` types. It is basically a copy of timestamp pattern but without timezone. ### Why are the changes needed? The PR fixes issues when writing and reading TimestampNTZ to and from JSON. ### Does this PR introduce _any_ user-facing change? Previously, JSON data source would infer timestamp values as `TimestampType` when reading a JSON file. Now, the data source would infer the timestamp value type based on the format (with or without timezone) and default timestamp type based on `spark.sql.timestampType`. A new JSON option `timestampNTZFormat` is added to control the way values are formatted during writes or parsed during reads. ### How was this patch tested? I extended `JsonSuite` with a few unit tests to verify that write-read roundtrip works for `TIMESTAMP_NTZ` and `TIMESTAMP_LTZ` values. Closes #34638 from sadikovi/timestamp-ntz-support-json. Authored-by: Ivan Sadikov <ivan.sadi...@databricks.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- docs/sql-data-sources-json.md | 10 +- .../spark/sql/catalyst/json/JSONOptions.scala | 9 +- .../spark/sql/catalyst/json/JacksonGenerator.scala | 2 +- .../spark/sql/catalyst/json/JacksonParser.scala | 4 +- .../spark/sql/catalyst/json/JsonInferSchema.scala | 12 ++ .../sql/execution/datasources/json/JsonSuite.scala | 194 ++++++++++++++++++++- 6 files changed, 216 insertions(+), 15 deletions(-) diff --git a/docs/sql-data-sources-json.md b/docs/sql-data-sources-json.md index 5e3bd2b..b5f27aa 100644 --- a/docs/sql-data-sources-json.md +++ b/docs/sql-data-sources-json.md @@ -9,9 +9,9 @@ license: | The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -197,6 +197,12 @@ Data source options of JSON can be set via: <td>read/write</td> </tr> <tr> + <td><code>timestampNTZFormat</code></td> + <td>yyyy-MM-dd'T'HH:mm:ss[.SSS]</td> + <td>Sets the string that indicates a timestamp without timezone format. Custom date formats follow the formats at <a href="https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html">Datetime Patterns</a>. This applies to timestamp without timezone type, note that zone-offset and time-zone components are not supported when writing or reading this data type.</td> + <td>read/write</td> + </tr> + <tr> <td><code>multiLine</code></td> <td><code>false</code></td> <td>Parse one record, which may span multiple lines, per file.</td> diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 029c014..e801912 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -106,6 +106,10 @@ private[sql] class JSONOptions( s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]" }) + val timestampNTZFormatInRead: Option[String] = parameters.get("timestampNTZFormat") + val timestampNTZFormatInWrite: String = + parameters.getOrElse("timestampNTZFormat", s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS]") + val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) /** @@ -138,8 +142,9 @@ private[sql] class JSONOptions( val pretty: Boolean = parameters.get("pretty").map(_.toBoolean).getOrElse(false) /** - * Enables inferring of TimestampType from strings matched to the timestamp pattern - * defined by the timestampFormat option. + * Enables inferring of TimestampType and TimestampNTZType from strings matched to the + * corresponding timestamp pattern defined by the timestampFormat and timestampNTZFormat options + * respectively. */ val inferTimestamp: Boolean = parameters.get("inferTimestamp").map(_.toBoolean).getOrElse(false) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala index d00065b..336c0ce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala @@ -91,7 +91,7 @@ private[sql] class JacksonGenerator( legacyFormat = FAST_DATE_FORMAT, isParsing = false) private val timestampNTZFormatter = TimestampFormatter( - options.timestampFormatInWrite, + options.timestampNTZFormatInWrite, options.zoneId, legacyFormat = FAST_DATE_FORMAT, isParsing = false, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index cb6a079..3bce46b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -66,7 +66,7 @@ class JacksonParser( legacyFormat = FAST_DATE_FORMAT, isParsing = true) private lazy val timestampNTZFormatter = TimestampFormatter( - options.timestampFormatInRead, + options.timestampNTZFormatInRead, options.zoneId, legacyFormat = FAST_DATE_FORMAT, isParsing = true, @@ -262,7 +262,7 @@ class JacksonParser( case TimestampNTZType => (parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) { case VALUE_STRING if parser.getTextLength >= 1 => - timestampNTZFormatter.parseWithoutTimeZone(parser.getText) + timestampNTZFormatter.parseWithoutTimeZone(parser.getText, false) } case DateType => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala index 3b62b16..6a63118 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala @@ -46,6 +46,12 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { options.locale, legacyFormat = FAST_DATE_FORMAT, isParsing = true) + private val timestampNTZFormatter = TimestampFormatter( + options.timestampNTZFormatInRead, + options.zoneId, + legacyFormat = FAST_DATE_FORMAT, + isParsing = true, + forTimestampNTZ = true) private def handleJsonErrorsByParseMode(parseMode: ParseMode, columnNameOfCorruptRecord: String, e: Throwable): Option[StructType] = { @@ -145,6 +151,9 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { if (options.prefersDecimal && decimalTry.isDefined) { decimalTry.get } else if (options.inferTimestamp && + (allCatch opt timestampNTZFormatter.parseWithoutTimeZone(field, false)).isDefined) { + SQLConf.get.timestampType + } else if (options.inferTimestamp && (allCatch opt timestampFormatter.parse(field)).isDefined) { TimestampType } else { @@ -393,6 +402,9 @@ object JsonInferSchema { case (t1: DecimalType, t2: IntegralType) => compatibleType(t1, DecimalType.forType(t2)) + case (TimestampNTZType, TimestampType) | (TimestampType, TimestampNTZType) => + TimestampType + // strings and every string is a Json object. case (_, _) => StringType } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 075d6e9..3daad30 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2746,6 +2746,188 @@ abstract class JsonSuite } } + test("SPARK-37360: Write and infer TIMESTAMP_NTZ values with a non-default pattern") { + withTempPath { path => + val exp = spark.sql(""" + select timestamp_ntz'2020-12-12 12:12:12' as col0 union all + select timestamp_ntz'2020-12-12 12:12:12.123456' as col0 + """) + exp.write + .option("timestampNTZFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS") + .json(path.getAbsolutePath) + + withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) { + val res = spark.read + .option("timestampNTZFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS") + .option("inferTimestamp", "true") + .json(path.getAbsolutePath) + + assert(res.dtypes === exp.dtypes) + checkAnswer(res, exp) + } + } + } + + test("SPARK-37360: Write and infer TIMESTAMP_LTZ values with a non-default pattern") { + withTempPath { path => + val exp = spark.sql(""" + select timestamp_ltz'2020-12-12 12:12:12' as col0 union all + select timestamp_ltz'2020-12-12 12:12:12.123456' as col0 + """) + exp.write + .option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS") + .json(path.getAbsolutePath) + + withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString) { + val res = spark.read + .option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS") + .option("inferTimestamp", "true") + .json(path.getAbsolutePath) + + assert(res.dtypes === exp.dtypes) + checkAnswer(res, exp) + } + } + } + + test("SPARK-37360: Roundtrip in reading and writing TIMESTAMP_NTZ values with custom schema") { + withTempPath { path => + val exp = spark.sql(""" + select + timestamp_ntz'2020-12-12 12:12:12' as col1, + timestamp_ltz'2020-12-12 12:12:12' as col2 + """) + + exp.write.json(path.getAbsolutePath) + + val res = spark.read + .schema("col1 TIMESTAMP_NTZ, col2 TIMESTAMP_LTZ") + .json(path.getAbsolutePath) + + checkAnswer(res, exp) + } + } + + test("SPARK-37360: Timestamp type inference for a column with TIMESTAMP_NTZ values") { + withTempPath { path => + val exp = spark.sql(""" + select timestamp_ntz'2020-12-12 12:12:12' as col0 union all + select timestamp_ntz'2020-12-12 12:12:12' as col0 + """) + + exp.write.json(path.getAbsolutePath) + + val timestampTypes = Seq( + SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString, + SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString) + + for (timestampType <- timestampTypes) { + withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> timestampType) { + val res = spark.read.option("inferTimestamp", "true").json(path.getAbsolutePath) + + if (timestampType == SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) { + checkAnswer(res, exp) + } else { + checkAnswer( + res, + spark.sql(""" + select timestamp_ltz'2020-12-12 12:12:12' as col0 union all + select timestamp_ltz'2020-12-12 12:12:12' as col0 + """) + ) + } + } + } + } + } + + test("SPARK-37360: Timestamp type inference for a mix of TIMESTAMP_NTZ and TIMESTAMP_LTZ") { + withTempPath { path => + Seq( + """{"col0":"2020-12-12T12:12:12.000"}""", + """{"col0":"2020-12-12T17:12:12.000Z"}""", + """{"col0":"2020-12-12T17:12:12.000+05:00"}""", + """{"col0":"2020-12-12T12:12:12.000"}""" + ).toDF("data") + .coalesce(1) + .write.text(path.getAbsolutePath) + + for (policy <- Seq("exception", "corrected", "legacy")) { + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> policy) { + val res = spark.read.option("inferTimestamp", "true").json(path.getAbsolutePath) + + // NOTE: + // Every value is tested for all types in JSON schema inference so the sequence of + // ["timestamp_ntz", "timestamp_ltz", "timestamp_ntz"] results in "timestamp_ltz". + // This is different from CSV where inference starts from the last inferred type. + // + // This is why the similar test in CSV has a different result in "legacy" mode. + + val exp = spark.sql(""" + select timestamp_ltz'2020-12-12T12:12:12.000' as col0 union all + select timestamp_ltz'2020-12-12T17:12:12.000Z' as col0 union all + select timestamp_ltz'2020-12-12T17:12:12.000+05:00' as col0 union all + select timestamp_ltz'2020-12-12T12:12:12.000' as col0 + """) + checkAnswer(res, exp) + } + } + } + } + + test("SPARK-37360: Malformed records when reading TIMESTAMP_LTZ as TIMESTAMP_NTZ") { + withTempPath { path => + Seq( + """{"col0": "2020-12-12T12:12:12.000"}""", + """{"col0": "2020-12-12T12:12:12.000Z"}""", + """{"col0": "2020-12-12T12:12:12.000+05:00"}""", + """{"col0": "2020-12-12T12:12:12.000"}""" + ).toDF("data") + .coalesce(1) + .write.text(path.getAbsolutePath) + + for (timestampNTZFormat <- Seq(None, Some("yyyy-MM-dd'T'HH:mm:ss[.SSS]"))) { + val reader = spark.read.schema("col0 TIMESTAMP_NTZ") + val res = timestampNTZFormat match { + case Some(format) => + reader.option("timestampNTZFormat", format).json(path.getAbsolutePath) + case None => + reader.json(path.getAbsolutePath) + } + + checkAnswer( + res, + Seq( + Row(LocalDateTime.of(2020, 12, 12, 12, 12, 12)), + Row(null), + Row(null), + Row(LocalDateTime.of(2020, 12, 12, 12, 12, 12)) + ) + ) + } + } + } + + test("SPARK-37360: Fail to write TIMESTAMP_NTZ if timestampNTZFormat contains zone offset") { + val patterns = Seq( + "yyyy-MM-dd HH:mm:ss XXX", + "yyyy-MM-dd HH:mm:ss Z", + "yyyy-MM-dd HH:mm:ss z") + + val exp = spark.sql("select timestamp_ntz'2020-12-12 12:12:12' as col0") + for (pattern <- patterns) { + withTempPath { path => + val err = intercept[SparkException] { + exp.write.option("timestampNTZFormat", pattern).json(path.getAbsolutePath) + } + assert( + err.getCause.getMessage.contains("Unsupported field: OffsetSeconds") || + err.getCause.getMessage.contains("Unable to extract value") || + err.getCause.getMessage.contains("Unable to extract ZoneId")) + } + } + } + test("filters push down") { withTempPath { path => val t = "2019-12-17 00:01:02" @@ -2996,10 +3178,6 @@ abstract class JsonSuite } test("SPARK-36536: use casting when datetime pattern is not set") { - def isLegacy: Boolean = { - spark.conf.get(SQLConf.LEGACY_TIME_PARSER_POLICY).toUpperCase(Locale.ROOT) == - SQLConf.LegacyBehaviorPolicy.LEGACY.toString - } withSQLConf( SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true", SQLConf.SESSION_LOCAL_TIMEZONE.key -> DateTimeTestUtils.UTC.getId) { @@ -3017,13 +3195,13 @@ abstract class JsonSuite readback, Seq( Row(LocalDate.of(2021, 1, 1), Instant.parse("2021-01-01T00:00:00Z"), - if (isLegacy) null else LocalDateTime.of(2021, 1, 1, 0, 0, 0)), + LocalDateTime.of(2021, 1, 1, 0, 0, 0)), Row(LocalDate.of(2021, 1, 1), Instant.parse("2021-01-01T00:00:00Z"), - if (isLegacy) null else LocalDateTime.of(2021, 1, 1, 0, 0, 0)), + LocalDateTime.of(2021, 1, 1, 0, 0, 0)), Row(LocalDate.of(2021, 2, 1), Instant.parse("2021-03-02T00:00:00Z"), - if (isLegacy) null else LocalDateTime.of(2021, 10, 1, 0, 0, 0)), + LocalDateTime.of(2021, 10, 1, 0, 0, 0)), Row(LocalDate.of(2021, 8, 18), Instant.parse("2021-08-18T21:44:30Z"), - if (isLegacy) null else LocalDateTime.of(2021, 8, 18, 21, 44, 30, 123000000)))) + LocalDateTime.of(2021, 8, 18, 21, 44, 30, 123000000)))) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org