This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 47cb1f3  [SPARK-29949][SQL][2.4] Fix formatting of timestamps by 
JSON/CSV datasources
47cb1f3 is described below

commit 47cb1f359af62383e24198dbbaa0b4503348cd04
Author: Maxim Gekk <max.g...@gmail.com>
AuthorDate: Tue Nov 19 17:10:16 2019 +0800

    [SPARK-29949][SQL][2.4] Fix formatting of timestamps by JSON/CSV datasources
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to use the `format()` method of `FastDateFormat` which 
accepts an instance of the `Calendar` type. This allows to adjust the 
`MILLISECOND` field of the calendar directly before formatting. I added new 
method `format()` to `DateTimeUtils.TimestampParser`. This method splits the 
input timestamp to a part truncated to seconds and the seconds fractional part. 
The calendar is initialized by the first part in normal way, and the last one 
is converted to a form appropria [...]
    
    I refactored `MicrosCalendar` by passing the number of digits from the 
fraction pattern as a parameter to the default constructor because it is used 
by the existing `getMicros()` and new one `setMicros()`. `setMicros()` is used 
to set the seconds fraction to calendar's `MILLISECOND` field directly before 
formatting.
    
    This PR supports various patterns for seconds fractions from `S` up to 
`SSSSSS`. If the patterns has more than 6 `S`, the first 6 digits reflect to 
milliseconds and microseconds of the input timestamp but the rest digits are 
set to `0`.
    
    ### Why are the changes needed?
    This fixes a bug of incorrectly formatting timestamps in microsecond 
precision. For example:
    ```scala
    Seq(java.sql.Timestamp.valueOf("2019-11-18 11:56:00.123456")).toDF("t")
      .select(to_json(struct($"t"), Map("timestampFormat" -> "yyyy-MM-dd 
HH:mm:ss.SSSSSS")).as("json"))
      .show(false)
    +----------------------------------+
    |json                              |
    +----------------------------------+
    |{"t":"2019-11-18 11:56:00.000123"}|
    +----------------------------------+
    ```
    
    ### Does this PR introduce any user-facing change?
    Yes. The example above outputs:
    ```scala
    +----------------------------------+
    |json                              |
    +----------------------------------+
    |{"t":"2019-11-18 11:56:00.123456"}|
    +----------------------------------+
    ```
    
    ### How was this patch tested?
    - By new tests for formatting by different patterns from `S` to `SSSSSS` in 
`DateTimeUtilsSuite`
    - A test for `to_json()` in `JsonFunctionsSuite`
    - A roundtrp test for writing and reading back a timestamp in a CSV file.
    
    Closes #26582 from MaxGekk/micros-format-2.4.
    
    Authored-by: Maxim Gekk <max.g...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/catalyst/json/JacksonGenerator.scala |  6 ++--
 .../spark/sql/catalyst/util/DateTimeUtils.scala    | 35 ++++++++++++++-----
 .../sql/catalyst/util/DateTimeUtilsSuite.scala     | 40 ++++++++++++++++++++++
 .../datasources/csv/UnivocityGenerator.scala       |  6 ++--
 .../org/apache/spark/sql/JsonFunctionsSuite.scala  |  7 ++++
 .../sql/execution/datasources/csv/CSVSuite.scala   | 15 ++++++++
 6 files changed, 97 insertions(+), 12 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
index 9b86d86..a379f86 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.core._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
 import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.TimestampParser
 import org.apache.spark.sql.types._
 
 /**
@@ -74,6 +75,8 @@ private[sql] class JacksonGenerator(
 
   private val lineSeparator: String = options.lineSeparatorInWrite
 
+  @transient private lazy val timestampParser = new 
TimestampParser(options.timestampFormat)
+
   private def makeWriter(dataType: DataType): ValueWriter = dataType match {
     case NullType =>
       (row: SpecializedGetters, ordinal: Int) =>
@@ -113,8 +116,7 @@ private[sql] class JacksonGenerator(
 
     case TimestampType =>
       (row: SpecializedGetters, ordinal: Int) =>
-        val timestampString =
-          
options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)))
+        val timestampString = timestampParser.format(row.getLong(ordinal))
         gen.writeString(timestampString)
 
     case DateType =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index d2bb595..f6993ae 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -1173,12 +1173,16 @@ object DateTimeUtils {
    * protected `fields` immediately after parsing. We cannot use
    * the `get()` method because it performs normalization of the fraction
    * part. Accordingly, the `MILLISECOND` field doesn't contain original value.
+   *
+   * Also this class allows to set raw value to the `MILLISECOND` field
+   * directly before formatting.
    */
-  private class MicrosCalendar(tz: TimeZone) extends GregorianCalendar(tz, 
Locale.US) {
+  private class MicrosCalendar(tz: TimeZone, digitsInFraction: Int)
+    extends GregorianCalendar(tz, Locale.US) {
     // Converts parsed `MILLISECOND` field to seconds fraction in microsecond 
precision.
     // For example if the fraction pattern is `SSSS` then `digitsInFraction` = 
4, and
     // if the `MILLISECOND` field was parsed to `1234`.
-    def getMicros(digitsInFraction: Int): SQLTimestamp = {
+    def getMicros(): SQLTimestamp = {
       // Append 6 zeros to the field: 1234 -> 1234000000
       val d = fields(Calendar.MILLISECOND) * MICROS_PER_SECOND
       // Take the first 6 digits from `d`: 1234000000 -> 123400
@@ -1186,24 +1190,39 @@ object DateTimeUtils {
       // So, the result is `(1234 * 1000000) / (10 ^ digitsInFraction)
       d / Decimal.POW_10(digitsInFraction)
     }
+
+    // Converts the seconds fraction in microsecond precision to a value
+    // that can be correctly formatted according to the specified fraction 
pattern.
+    // The method performs operations opposite to `getMicros()`.
+    def setMicros(micros: Long): Unit = {
+      val d = micros * Decimal.POW_10(digitsInFraction)
+      fields(Calendar.MILLISECOND) = (d / MICROS_PER_SECOND).toInt
+    }
   }
 
   /**
    * An instance of the class is aimed to re-use many times. It contains 
helper objects
-   * `cal` and `digitsInFraction` that are reused between `parse()` invokes.
+   * `cal` which is reused between `parse()` and `format` invokes.
    */
-  class TimestampParser(format: FastDateFormat) {
-    private val digitsInFraction = format.getPattern.count(_ == 'S')
-    private val cal = new MicrosCalendar(format.getTimeZone)
+  class TimestampParser(fastDateFormat: FastDateFormat) {
+    private val cal = new MicrosCalendar(
+      fastDateFormat.getTimeZone,
+      fastDateFormat.getPattern.count(_ == 'S'))
 
     def parse(s: String): SQLTimestamp = {
       cal.clear() // Clear the calendar because it can be re-used many times
-      if (!format.parse(s, new ParsePosition(0), cal)) {
+      if (!fastDateFormat.parse(s, new ParsePosition(0), cal)) {
         throw new IllegalArgumentException(s"'$s' is an invalid timestamp")
       }
-      val micros = cal.getMicros(digitsInFraction)
+      val micros = cal.getMicros()
       cal.set(Calendar.MILLISECOND, 0)
       cal.getTimeInMillis * MICROS_PER_MILLIS + micros
     }
+
+    def format(timestamp: SQLTimestamp): String = {
+      cal.setTimeInMillis(Math.floorDiv(timestamp, MICROS_PER_SECOND) * 
MILLIS_PER_SECOND)
+      cal.setMicros(Math.floorMod(timestamp, MICROS_PER_SECOND))
+      fastDateFormat.format(cal)
+    }
   }
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
index ced003c..7eb3d2b 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
@@ -731,4 +731,44 @@ class DateTimeUtilsSuite extends SparkFunSuite {
       }
     }
   }
+
+  test("formatting timestamp strings up to microsecond precision") {
+    DateTimeTestUtils.outstandingTimezones.foreach { timeZone =>
+      def check(pattern: String, input: String, expected: String): Unit = {
+        val parser = new TimestampParser(FastDateFormat.getInstance(pattern, 
timeZone, Locale.US))
+        val timestamp = DateTimeUtils.stringToTimestamp(
+          UTF8String.fromString(input), timeZone).get
+        val actual = parser.format(timestamp)
+        assert(actual === expected)
+      }
+
+      check(
+        "yyyy-MM-dd HH:mm:ss.SSSSSSS", "2019-10-14T09:39:07.123456",
+        "2019-10-14 09:39:07.1234560")
+      check(
+        "yyyy-MM-dd HH:mm:ss.SSSSSS", "1960-01-01T09:39:07.123456",
+        "1960-01-01 09:39:07.123456")
+      check(
+        "yyyy-MM-dd HH:mm:ss.SSSSS", "0001-10-14T09:39:07.1",
+        "0001-10-14 09:39:07.10000")
+      check(
+        "yyyy-MM-dd HH:mm:ss.SSSS", "9999-12-31T23:59:59.999",
+        "9999-12-31 23:59:59.9990")
+      check(
+        "yyyy-MM-dd HH:mm:ss.SSS", "1970-01-01T00:00:00.0101",
+        "1970-01-01 00:00:00.010")
+      check(
+        "yyyy-MM-dd HH:mm:ss.SS", "2019-10-14T09:39:07.09",
+        "2019-10-14 09:39:07.09")
+      check(
+        "yyyy-MM-dd HH:mm:ss.S", "2019-10-14T09:39:07.2",
+        "2019-10-14 09:39:07.2")
+      check(
+        "yyyy-MM-dd HH:mm:ss.S", "2019-10-14T09:39:07",
+        "2019-10-14 09:39:07.0")
+      check(
+        "yyyy-MM-dd HH:mm:ss", "2019-10-14T09:39:07.123456",
+        "2019-10-14 09:39:07")
+    }
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala
index 4082a0d..3118091 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala
@@ -23,6 +23,7 @@ import com.univocity.parsers.csv.CsvWriter
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.TimestampParser
 import org.apache.spark.sql.types._
 
 private[csv] class UnivocityGenerator(
@@ -42,14 +43,15 @@ private[csv] class UnivocityGenerator(
   private val valueConverters: Array[ValueConverter] =
     schema.map(_.dataType).map(makeConverter).toArray
 
+  @transient private lazy val timestampParser = new 
TimestampParser(options.timestampFormat)
+
   private def makeConverter(dataType: DataType): ValueConverter = dataType 
match {
     case DateType =>
       (row: InternalRow, ordinal: Int) =>
         
options.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal)))
 
     case TimestampType =>
-      (row: InternalRow, ordinal: Int) =>
-        
options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)))
+      (row: InternalRow, ordinal: Int) => 
timestampParser.format(row.getLong(ordinal))
 
     case udt: UserDefinedType[_] => makeConverter(udt.sqlType)
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index 35087ce..b1f7446 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -528,4 +528,11 @@ class JsonFunctionsSuite extends QueryTest with 
SharedSQLContext {
       df.select(from_json($"value", schema, options)),
       Row(Row(java.sql.Timestamp.valueOf("1970-01-01 00:00:00.123456"))))
   }
+
+  test("to_json - timestamp in micros") {
+    val s = "2019-11-18 11:56:00.123456"
+    val df = Seq(java.sql.Timestamp.valueOf(s)).toDF("t").select(
+      to_json(struct($"t"), Map("timestampFormat" -> "yyyy-MM-dd 
HH:mm:ss.SSSSSS")))
+    checkAnswer(df, Row(s"""{"t":"$s"}"""))
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 95c9dc5..2ea8f4f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -1887,4 +1887,19 @@ class CSVSuite extends QueryTest with SharedSQLContext 
with SQLTestUtils with Te
       checkAnswer(readback, Row(Timestamp.valueOf(t)))
     }
   }
+
+  test("Roundtrip in reading and writing timestamps in microsecond precision") 
{
+    withTempPath { path =>
+      val timestamp = Timestamp.valueOf("2019-11-18 11:56:00.123456")
+      Seq(timestamp).toDF("t")
+        .write
+        .option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
+        .csv(path.getAbsolutePath)
+      val readback = spark.read
+        .schema("t timestamp")
+        .option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
+        .csv(path.getAbsolutePath)
+      checkAnswer(readback, Row(timestamp))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to