This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f36978  [SPARK-37360][SQL] Support TimestampNTZ in JSON data source
4f36978 is described below

commit 4f369789bd5d6cc81a85fe01a37e0ae90cbdeb6c
Author: Ivan Sadikov <ivan.sadi...@databricks.com>
AuthorDate: Mon Dec 6 13:24:46 2021 +0500

    [SPARK-37360][SQL] Support TimestampNTZ in JSON data source
    
    ### What changes were proposed in this pull request?
    
    This PR adds support for TimestampNTZ type in the JSON data source.
    
    Most of the functionality has already been added, this patch verifies that 
writes + reads work for TimestampNTZ type and adds schema inference depending 
on the timestamp value format written. The following applies:
    - If there is a mixture of `TIMESTAMP_NTZ` and `TIMESTAMP_LTZ` values, use 
`TIMESTAMP_LTZ`.
    - If there are only `TIMESTAMP_NTZ` values, resolve using the the default 
timestamp type configured with `spark.sql.timestampType`.
    
    In addition, I introduced a new JSON option `timestampNTZFormat` which is 
similar to `timestampFormat` but it allows to configure read/write pattern for 
`TIMESTAMP_NTZ` types. It is basically a copy of timestamp pattern but without 
timezone.
    
    ### Why are the changes needed?
    
    The PR fixes issues when writing and reading TimestampNTZ to and from JSON.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Previously, JSON data source would infer timestamp values as 
`TimestampType` when reading a JSON file. Now, the data source would infer the 
timestamp value type based on the format (with or without timezone) and default 
timestamp type based on `spark.sql.timestampType`.
    
    A new JSON option `timestampNTZFormat` is added to control the way values 
are formatted during writes or parsed during reads.
    
    ### How was this patch tested?
    
    I extended `JsonSuite` with a few unit tests to verify that write-read 
roundtrip works for `TIMESTAMP_NTZ` and `TIMESTAMP_LTZ` values.
    
    Closes #34638 from sadikovi/timestamp-ntz-support-json.
    
    Authored-by: Ivan Sadikov <ivan.sadi...@databricks.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 docs/sql-data-sources-json.md                      |  10 +-
 .../spark/sql/catalyst/json/JSONOptions.scala      |   9 +-
 .../spark/sql/catalyst/json/JacksonGenerator.scala |   2 +-
 .../spark/sql/catalyst/json/JacksonParser.scala    |   4 +-
 .../spark/sql/catalyst/json/JsonInferSchema.scala  |  12 ++
 .../sql/execution/datasources/json/JsonSuite.scala | 194 ++++++++++++++++++++-
 6 files changed, 216 insertions(+), 15 deletions(-)

diff --git a/docs/sql-data-sources-json.md b/docs/sql-data-sources-json.md
index 5e3bd2b..b5f27aa 100644
--- a/docs/sql-data-sources-json.md
+++ b/docs/sql-data-sources-json.md
@@ -9,9 +9,9 @@ license: |
   The ASF licenses this file to You under the Apache License, Version 2.0
   (the "License"); you may not use this file except in compliance with
   the License.  You may obtain a copy of the License at
- 
+
      http://www.apache.org/licenses/LICENSE-2.0
- 
+
   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -197,6 +197,12 @@ Data source options of JSON can be set via:
     <td>read/write</td>
   </tr>
   <tr>
+    <td><code>timestampNTZFormat</code></td>
+    <td>yyyy-MM-dd'T'HH:mm:ss[.SSS]</td>
+    <td>Sets the string that indicates a timestamp without timezone format. 
Custom date formats follow the formats at <a 
href="https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html";>Datetime
 Patterns</a>. This applies to timestamp without timezone type, note that 
zone-offset and time-zone components are not supported when writing or reading 
this data type.</td>
+    <td>read/write</td>
+  </tr>
+  <tr>
     <td><code>multiLine</code></td>
     <td><code>false</code></td>
     <td>Parse one record, which may span multiple lines, per file.</td>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
index 029c014..e801912 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
@@ -106,6 +106,10 @@ private[sql] class JSONOptions(
       s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]"
     })
 
+  val timestampNTZFormatInRead: Option[String] = 
parameters.get("timestampNTZFormat")
+  val timestampNTZFormatInWrite: String =
+    parameters.getOrElse("timestampNTZFormat", 
s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS]")
+
   val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)
 
   /**
@@ -138,8 +142,9 @@ private[sql] class JSONOptions(
   val pretty: Boolean = 
parameters.get("pretty").map(_.toBoolean).getOrElse(false)
 
   /**
-   * Enables inferring of TimestampType from strings matched to the timestamp 
pattern
-   * defined by the timestampFormat option.
+   * Enables inferring of TimestampType and TimestampNTZType from strings 
matched to the
+   * corresponding timestamp pattern defined by the timestampFormat and 
timestampNTZFormat options
+   * respectively.
    */
   val inferTimestamp: Boolean = 
parameters.get("inferTimestamp").map(_.toBoolean).getOrElse(false)
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
index d00065b..336c0ce 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
@@ -91,7 +91,7 @@ private[sql] class JacksonGenerator(
     legacyFormat = FAST_DATE_FORMAT,
     isParsing = false)
   private val timestampNTZFormatter = TimestampFormatter(
-    options.timestampFormatInWrite,
+    options.timestampNTZFormatInWrite,
     options.zoneId,
     legacyFormat = FAST_DATE_FORMAT,
     isParsing = false,
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index cb6a079..3bce46b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -66,7 +66,7 @@ class JacksonParser(
     legacyFormat = FAST_DATE_FORMAT,
     isParsing = true)
   private lazy val timestampNTZFormatter = TimestampFormatter(
-    options.timestampFormatInRead,
+    options.timestampNTZFormatInRead,
     options.zoneId,
     legacyFormat = FAST_DATE_FORMAT,
     isParsing = true,
@@ -262,7 +262,7 @@ class JacksonParser(
     case TimestampNTZType =>
       (parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) 
{
         case VALUE_STRING if parser.getTextLength >= 1 =>
-          timestampNTZFormatter.parseWithoutTimeZone(parser.getText)
+          timestampNTZFormatter.parseWithoutTimeZone(parser.getText, false)
       }
 
     case DateType =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
index 3b62b16..6a63118 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
@@ -46,6 +46,12 @@ private[sql] class JsonInferSchema(options: JSONOptions) 
extends Serializable {
     options.locale,
     legacyFormat = FAST_DATE_FORMAT,
     isParsing = true)
+  private val timestampNTZFormatter = TimestampFormatter(
+    options.timestampNTZFormatInRead,
+    options.zoneId,
+    legacyFormat = FAST_DATE_FORMAT,
+    isParsing = true,
+    forTimestampNTZ = true)
 
   private def handleJsonErrorsByParseMode(parseMode: ParseMode,
       columnNameOfCorruptRecord: String, e: Throwable): Option[StructType] = {
@@ -145,6 +151,9 @@ private[sql] class JsonInferSchema(options: JSONOptions) 
extends Serializable {
         if (options.prefersDecimal && decimalTry.isDefined) {
           decimalTry.get
         } else if (options.inferTimestamp &&
+            (allCatch opt timestampNTZFormatter.parseWithoutTimeZone(field, 
false)).isDefined) {
+          SQLConf.get.timestampType
+        } else if (options.inferTimestamp &&
             (allCatch opt timestampFormatter.parse(field)).isDefined) {
           TimestampType
         } else {
@@ -393,6 +402,9 @@ object JsonInferSchema {
         case (t1: DecimalType, t2: IntegralType) =>
           compatibleType(t1, DecimalType.forType(t2))
 
+        case (TimestampNTZType, TimestampType) | (TimestampType, 
TimestampNTZType) =>
+          TimestampType
+
         // strings and every string is a Json object.
         case (_, _) => StringType
       }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 075d6e9..3daad30 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -2746,6 +2746,188 @@ abstract class JsonSuite
     }
   }
 
+  test("SPARK-37360: Write and infer TIMESTAMP_NTZ values with a non-default 
pattern") {
+    withTempPath { path =>
+      val exp = spark.sql("""
+        select timestamp_ntz'2020-12-12 12:12:12' as col0 union all
+        select timestamp_ntz'2020-12-12 12:12:12.123456' as col0
+        """)
+      exp.write
+        .option("timestampNTZFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
+        .json(path.getAbsolutePath)
+
+      withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> 
SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) {
+        val res = spark.read
+          .option("timestampNTZFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
+          .option("inferTimestamp", "true")
+          .json(path.getAbsolutePath)
+
+        assert(res.dtypes === exp.dtypes)
+        checkAnswer(res, exp)
+      }
+    }
+  }
+
+  test("SPARK-37360: Write and infer TIMESTAMP_LTZ values with a non-default 
pattern") {
+    withTempPath { path =>
+      val exp = spark.sql("""
+        select timestamp_ltz'2020-12-12 12:12:12' as col0 union all
+        select timestamp_ltz'2020-12-12 12:12:12.123456' as col0
+        """)
+      exp.write
+        .option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
+        .json(path.getAbsolutePath)
+
+      withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> 
SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString) {
+        val res = spark.read
+          .option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
+          .option("inferTimestamp", "true")
+          .json(path.getAbsolutePath)
+
+        assert(res.dtypes === exp.dtypes)
+        checkAnswer(res, exp)
+      }
+    }
+  }
+
+  test("SPARK-37360: Roundtrip in reading and writing TIMESTAMP_NTZ values 
with custom schema") {
+    withTempPath { path =>
+      val exp = spark.sql("""
+        select
+          timestamp_ntz'2020-12-12 12:12:12' as col1,
+          timestamp_ltz'2020-12-12 12:12:12' as col2
+        """)
+
+      exp.write.json(path.getAbsolutePath)
+
+      val res = spark.read
+        .schema("col1 TIMESTAMP_NTZ, col2 TIMESTAMP_LTZ")
+        .json(path.getAbsolutePath)
+
+      checkAnswer(res, exp)
+    }
+  }
+
+  test("SPARK-37360: Timestamp type inference for a column with TIMESTAMP_NTZ 
values") {
+    withTempPath { path =>
+      val exp = spark.sql("""
+        select timestamp_ntz'2020-12-12 12:12:12' as col0 union all
+        select timestamp_ntz'2020-12-12 12:12:12' as col0
+        """)
+
+      exp.write.json(path.getAbsolutePath)
+
+      val timestampTypes = Seq(
+        SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString,
+        SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString)
+
+      for (timestampType <- timestampTypes) {
+        withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> timestampType) {
+          val res = spark.read.option("inferTimestamp", 
"true").json(path.getAbsolutePath)
+
+          if (timestampType == SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) {
+            checkAnswer(res, exp)
+          } else {
+            checkAnswer(
+              res,
+              spark.sql("""
+                select timestamp_ltz'2020-12-12 12:12:12' as col0 union all
+                select timestamp_ltz'2020-12-12 12:12:12' as col0
+                """)
+            )
+          }
+        }
+      }
+    }
+  }
+
+  test("SPARK-37360: Timestamp type inference for a mix of TIMESTAMP_NTZ and 
TIMESTAMP_LTZ") {
+    withTempPath { path =>
+      Seq(
+        """{"col0":"2020-12-12T12:12:12.000"}""",
+        """{"col0":"2020-12-12T17:12:12.000Z"}""",
+        """{"col0":"2020-12-12T17:12:12.000+05:00"}""",
+        """{"col0":"2020-12-12T12:12:12.000"}"""
+      ).toDF("data")
+        .coalesce(1)
+        .write.text(path.getAbsolutePath)
+
+      for (policy <- Seq("exception", "corrected", "legacy")) {
+        withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> policy) {
+          val res = spark.read.option("inferTimestamp", 
"true").json(path.getAbsolutePath)
+
+          // NOTE:
+          // Every value is tested for all types in JSON schema inference so 
the sequence of
+          // ["timestamp_ntz", "timestamp_ltz", "timestamp_ntz"] results in 
"timestamp_ltz".
+          // This is different from CSV where inference starts from the last 
inferred type.
+          //
+          // This is why the similar test in CSV has a different result in 
"legacy" mode.
+
+          val exp = spark.sql("""
+            select timestamp_ltz'2020-12-12T12:12:12.000' as col0 union all
+            select timestamp_ltz'2020-12-12T17:12:12.000Z' as col0 union all
+            select timestamp_ltz'2020-12-12T17:12:12.000+05:00' as col0 union 
all
+            select timestamp_ltz'2020-12-12T12:12:12.000' as col0
+            """)
+          checkAnswer(res, exp)
+        }
+      }
+    }
+  }
+
+  test("SPARK-37360: Malformed records when reading TIMESTAMP_LTZ as 
TIMESTAMP_NTZ") {
+    withTempPath { path =>
+      Seq(
+        """{"col0": "2020-12-12T12:12:12.000"}""",
+        """{"col0": "2020-12-12T12:12:12.000Z"}""",
+        """{"col0": "2020-12-12T12:12:12.000+05:00"}""",
+        """{"col0": "2020-12-12T12:12:12.000"}"""
+      ).toDF("data")
+        .coalesce(1)
+        .write.text(path.getAbsolutePath)
+
+      for (timestampNTZFormat <- Seq(None, 
Some("yyyy-MM-dd'T'HH:mm:ss[.SSS]"))) {
+        val reader = spark.read.schema("col0 TIMESTAMP_NTZ")
+        val res = timestampNTZFormat match {
+          case Some(format) =>
+            reader.option("timestampNTZFormat", 
format).json(path.getAbsolutePath)
+          case None =>
+            reader.json(path.getAbsolutePath)
+        }
+
+        checkAnswer(
+          res,
+          Seq(
+            Row(LocalDateTime.of(2020, 12, 12, 12, 12, 12)),
+            Row(null),
+            Row(null),
+            Row(LocalDateTime.of(2020, 12, 12, 12, 12, 12))
+          )
+        )
+      }
+    }
+  }
+
+  test("SPARK-37360: Fail to write TIMESTAMP_NTZ if timestampNTZFormat 
contains zone offset") {
+    val patterns = Seq(
+      "yyyy-MM-dd HH:mm:ss XXX",
+      "yyyy-MM-dd HH:mm:ss Z",
+      "yyyy-MM-dd HH:mm:ss z")
+
+    val exp = spark.sql("select timestamp_ntz'2020-12-12 12:12:12' as col0")
+    for (pattern <- patterns) {
+      withTempPath { path =>
+        val err = intercept[SparkException] {
+          exp.write.option("timestampNTZFormat", 
pattern).json(path.getAbsolutePath)
+        }
+        assert(
+          err.getCause.getMessage.contains("Unsupported field: OffsetSeconds") 
||
+          err.getCause.getMessage.contains("Unable to extract value") ||
+          err.getCause.getMessage.contains("Unable to extract ZoneId"))
+      }
+    }
+  }
+
   test("filters push down") {
     withTempPath { path =>
       val t = "2019-12-17 00:01:02"
@@ -2996,10 +3178,6 @@ abstract class JsonSuite
   }
 
   test("SPARK-36536: use casting when datetime pattern is not set") {
-    def isLegacy: Boolean = {
-      
spark.conf.get(SQLConf.LEGACY_TIME_PARSER_POLICY).toUpperCase(Locale.ROOT) ==
-        SQLConf.LegacyBehaviorPolicy.LEGACY.toString
-    }
     withSQLConf(
       SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true",
       SQLConf.SESSION_LOCAL_TIMEZONE.key -> DateTimeTestUtils.UTC.getId) {
@@ -3017,13 +3195,13 @@ abstract class JsonSuite
           readback,
           Seq(
             Row(LocalDate.of(2021, 1, 1), 
Instant.parse("2021-01-01T00:00:00Z"),
-              if (isLegacy) null else LocalDateTime.of(2021, 1, 1, 0, 0, 0)),
+              LocalDateTime.of(2021, 1, 1, 0, 0, 0)),
             Row(LocalDate.of(2021, 1, 1), 
Instant.parse("2021-01-01T00:00:00Z"),
-              if (isLegacy) null else LocalDateTime.of(2021, 1, 1, 0, 0, 0)),
+              LocalDateTime.of(2021, 1, 1, 0, 0, 0)),
             Row(LocalDate.of(2021, 2, 1), 
Instant.parse("2021-03-02T00:00:00Z"),
-              if (isLegacy) null else LocalDateTime.of(2021, 10, 1, 0, 0, 0)),
+              LocalDateTime.of(2021, 10, 1, 0, 0, 0)),
             Row(LocalDate.of(2021, 8, 18), 
Instant.parse("2021-08-18T21:44:30Z"),
-              if (isLegacy) null else LocalDateTime.of(2021, 8, 18, 21, 44, 
30, 123000000))))
+              LocalDateTime.of(2021, 8, 18, 21, 44, 30, 123000000))))
       }
     }
   }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to