This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push: new 47cb1f3 [SPARK-29949][SQL][2.4] Fix formatting of timestamps by JSON/CSV datasources 47cb1f3 is described below commit 47cb1f359af62383e24198dbbaa0b4503348cd04 Author: Maxim Gekk <max.g...@gmail.com> AuthorDate: Tue Nov 19 17:10:16 2019 +0800 [SPARK-29949][SQL][2.4] Fix formatting of timestamps by JSON/CSV datasources ### What changes were proposed in this pull request? In the PR, I propose to use the `format()` method of `FastDateFormat` which accepts an instance of the `Calendar` type. This allows to adjust the `MILLISECOND` field of the calendar directly before formatting. I added new method `format()` to `DateTimeUtils.TimestampParser`. This method splits the input timestamp to a part truncated to seconds and the seconds fractional part. The calendar is initialized by the first part in normal way, and the last one is converted to a form appropria [...] I refactored `MicrosCalendar` by passing the number of digits from the fraction pattern as a parameter to the default constructor because it is used by the existing `getMicros()` and new one `setMicros()`. `setMicros()` is used to set the seconds fraction to calendar's `MILLISECOND` field directly before formatting. This PR supports various patterns for seconds fractions from `S` up to `SSSSSS`. If the patterns has more than 6 `S`, the first 6 digits reflect to milliseconds and microseconds of the input timestamp but the rest digits are set to `0`. ### Why are the changes needed? This fixes a bug of incorrectly formatting timestamps in microsecond precision. For example: ```scala Seq(java.sql.Timestamp.valueOf("2019-11-18 11:56:00.123456")).toDF("t") .select(to_json(struct($"t"), Map("timestampFormat" -> "yyyy-MM-dd HH:mm:ss.SSSSSS")).as("json")) .show(false) +----------------------------------+ |json | +----------------------------------+ |{"t":"2019-11-18 11:56:00.000123"}| +----------------------------------+ ``` ### Does this PR introduce any user-facing change? Yes. The example above outputs: ```scala +----------------------------------+ |json | +----------------------------------+ |{"t":"2019-11-18 11:56:00.123456"}| +----------------------------------+ ``` ### How was this patch tested? - By new tests for formatting by different patterns from `S` to `SSSSSS` in `DateTimeUtilsSuite` - A test for `to_json()` in `JsonFunctionsSuite` - A roundtrp test for writing and reading back a timestamp in a CSV file. Closes #26582 from MaxGekk/micros-format-2.4. Authored-by: Maxim Gekk <max.g...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/catalyst/json/JacksonGenerator.scala | 6 ++-- .../spark/sql/catalyst/util/DateTimeUtils.scala | 35 ++++++++++++++----- .../sql/catalyst/util/DateTimeUtilsSuite.scala | 40 ++++++++++++++++++++++ .../datasources/csv/UnivocityGenerator.scala | 6 ++-- .../org/apache/spark/sql/JsonFunctionsSuite.scala | 7 ++++ .../sql/execution/datasources/csv/CSVSuite.scala | 15 ++++++++ 6 files changed, 97 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala index 9b86d86..a379f86 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala @@ -24,6 +24,7 @@ import com.fasterxml.jackson.core._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SpecializedGetters import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData} +import org.apache.spark.sql.catalyst.util.DateTimeUtils.TimestampParser import org.apache.spark.sql.types._ /** @@ -74,6 +75,8 @@ private[sql] class JacksonGenerator( private val lineSeparator: String = options.lineSeparatorInWrite + @transient private lazy val timestampParser = new TimestampParser(options.timestampFormat) + private def makeWriter(dataType: DataType): ValueWriter = dataType match { case NullType => (row: SpecializedGetters, ordinal: Int) => @@ -113,8 +116,7 @@ private[sql] class JacksonGenerator( case TimestampType => (row: SpecializedGetters, ordinal: Int) => - val timestampString = - options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal))) + val timestampString = timestampParser.format(row.getLong(ordinal)) gen.writeString(timestampString) case DateType => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index d2bb595..f6993ae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -1173,12 +1173,16 @@ object DateTimeUtils { * protected `fields` immediately after parsing. We cannot use * the `get()` method because it performs normalization of the fraction * part. Accordingly, the `MILLISECOND` field doesn't contain original value. + * + * Also this class allows to set raw value to the `MILLISECOND` field + * directly before formatting. */ - private class MicrosCalendar(tz: TimeZone) extends GregorianCalendar(tz, Locale.US) { + private class MicrosCalendar(tz: TimeZone, digitsInFraction: Int) + extends GregorianCalendar(tz, Locale.US) { // Converts parsed `MILLISECOND` field to seconds fraction in microsecond precision. // For example if the fraction pattern is `SSSS` then `digitsInFraction` = 4, and // if the `MILLISECOND` field was parsed to `1234`. - def getMicros(digitsInFraction: Int): SQLTimestamp = { + def getMicros(): SQLTimestamp = { // Append 6 zeros to the field: 1234 -> 1234000000 val d = fields(Calendar.MILLISECOND) * MICROS_PER_SECOND // Take the first 6 digits from `d`: 1234000000 -> 123400 @@ -1186,24 +1190,39 @@ object DateTimeUtils { // So, the result is `(1234 * 1000000) / (10 ^ digitsInFraction) d / Decimal.POW_10(digitsInFraction) } + + // Converts the seconds fraction in microsecond precision to a value + // that can be correctly formatted according to the specified fraction pattern. + // The method performs operations opposite to `getMicros()`. + def setMicros(micros: Long): Unit = { + val d = micros * Decimal.POW_10(digitsInFraction) + fields(Calendar.MILLISECOND) = (d / MICROS_PER_SECOND).toInt + } } /** * An instance of the class is aimed to re-use many times. It contains helper objects - * `cal` and `digitsInFraction` that are reused between `parse()` invokes. + * `cal` which is reused between `parse()` and `format` invokes. */ - class TimestampParser(format: FastDateFormat) { - private val digitsInFraction = format.getPattern.count(_ == 'S') - private val cal = new MicrosCalendar(format.getTimeZone) + class TimestampParser(fastDateFormat: FastDateFormat) { + private val cal = new MicrosCalendar( + fastDateFormat.getTimeZone, + fastDateFormat.getPattern.count(_ == 'S')) def parse(s: String): SQLTimestamp = { cal.clear() // Clear the calendar because it can be re-used many times - if (!format.parse(s, new ParsePosition(0), cal)) { + if (!fastDateFormat.parse(s, new ParsePosition(0), cal)) { throw new IllegalArgumentException(s"'$s' is an invalid timestamp") } - val micros = cal.getMicros(digitsInFraction) + val micros = cal.getMicros() cal.set(Calendar.MILLISECOND, 0) cal.getTimeInMillis * MICROS_PER_MILLIS + micros } + + def format(timestamp: SQLTimestamp): String = { + cal.setTimeInMillis(Math.floorDiv(timestamp, MICROS_PER_SECOND) * MILLIS_PER_SECOND) + cal.setMicros(Math.floorMod(timestamp, MICROS_PER_SECOND)) + fastDateFormat.format(cal) + } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala index ced003c..7eb3d2b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala @@ -731,4 +731,44 @@ class DateTimeUtilsSuite extends SparkFunSuite { } } } + + test("formatting timestamp strings up to microsecond precision") { + DateTimeTestUtils.outstandingTimezones.foreach { timeZone => + def check(pattern: String, input: String, expected: String): Unit = { + val parser = new TimestampParser(FastDateFormat.getInstance(pattern, timeZone, Locale.US)) + val timestamp = DateTimeUtils.stringToTimestamp( + UTF8String.fromString(input), timeZone).get + val actual = parser.format(timestamp) + assert(actual === expected) + } + + check( + "yyyy-MM-dd HH:mm:ss.SSSSSSS", "2019-10-14T09:39:07.123456", + "2019-10-14 09:39:07.1234560") + check( + "yyyy-MM-dd HH:mm:ss.SSSSSS", "1960-01-01T09:39:07.123456", + "1960-01-01 09:39:07.123456") + check( + "yyyy-MM-dd HH:mm:ss.SSSSS", "0001-10-14T09:39:07.1", + "0001-10-14 09:39:07.10000") + check( + "yyyy-MM-dd HH:mm:ss.SSSS", "9999-12-31T23:59:59.999", + "9999-12-31 23:59:59.9990") + check( + "yyyy-MM-dd HH:mm:ss.SSS", "1970-01-01T00:00:00.0101", + "1970-01-01 00:00:00.010") + check( + "yyyy-MM-dd HH:mm:ss.SS", "2019-10-14T09:39:07.09", + "2019-10-14 09:39:07.09") + check( + "yyyy-MM-dd HH:mm:ss.S", "2019-10-14T09:39:07.2", + "2019-10-14 09:39:07.2") + check( + "yyyy-MM-dd HH:mm:ss.S", "2019-10-14T09:39:07", + "2019-10-14 09:39:07.0") + check( + "yyyy-MM-dd HH:mm:ss", "2019-10-14T09:39:07.123456", + "2019-10-14 09:39:07") + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala index 4082a0d..3118091 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala @@ -23,6 +23,7 @@ import com.univocity.parsers.csv.CsvWriter import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.util.DateTimeUtils.TimestampParser import org.apache.spark.sql.types._ private[csv] class UnivocityGenerator( @@ -42,14 +43,15 @@ private[csv] class UnivocityGenerator( private val valueConverters: Array[ValueConverter] = schema.map(_.dataType).map(makeConverter).toArray + @transient private lazy val timestampParser = new TimestampParser(options.timestampFormat) + private def makeConverter(dataType: DataType): ValueConverter = dataType match { case DateType => (row: InternalRow, ordinal: Int) => options.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal))) case TimestampType => - (row: InternalRow, ordinal: Int) => - options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal))) + (row: InternalRow, ordinal: Int) => timestampParser.format(row.getLong(ordinal)) case udt: UserDefinedType[_] => makeConverter(udt.sqlType) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 35087ce..b1f7446 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -528,4 +528,11 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { df.select(from_json($"value", schema, options)), Row(Row(java.sql.Timestamp.valueOf("1970-01-01 00:00:00.123456")))) } + + test("to_json - timestamp in micros") { + val s = "2019-11-18 11:56:00.123456" + val df = Seq(java.sql.Timestamp.valueOf(s)).toDF("t").select( + to_json(struct($"t"), Map("timestampFormat" -> "yyyy-MM-dd HH:mm:ss.SSSSSS"))) + checkAnswer(df, Row(s"""{"t":"$s"}""")) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 95c9dc5..2ea8f4f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1887,4 +1887,19 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te checkAnswer(readback, Row(Timestamp.valueOf(t))) } } + + test("Roundtrip in reading and writing timestamps in microsecond precision") { + withTempPath { path => + val timestamp = Timestamp.valueOf("2019-11-18 11:56:00.123456") + Seq(timestamp).toDF("t") + .write + .option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS") + .csv(path.getAbsolutePath) + val readback = spark.read + .schema("t timestamp") + .option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS") + .csv(path.getAbsolutePath) + checkAnswer(readback, Row(timestamp)) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org