This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 79d3bc0 [SPARK-27438][SQL] Parse strings with timestamps by to_timestamp() in microsecond precision 79d3bc0 is described below commit 79d3bc0409ccf978126d9c2bc0740ace196fe55b Author: Maxim Gekk <max.g...@gmail.com> AuthorDate: Mon Apr 22 19:41:32 2019 +0800 [SPARK-27438][SQL] Parse strings with timestamps by to_timestamp() in microsecond precision ## What changes were proposed in this pull request? In the PR, I propose to parse strings to timestamps in microsecond precision by the ` to_timestamp()` function if the specified pattern contains a sub-pattern for seconds fractions. Closes #24342 ## How was this patch tested? By `DateFunctionsSuite` and `DateExpressionsSuite` Closes #24420 from MaxGekk/to_timestamp-microseconds3. Authored-by: Maxim Gekk <max.g...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../catalyst/expressions/datetimeExpressions.scala | 46 ++++++++++++++++------ sql/core/benchmarks/DateTimeBenchmark-results.txt | 20 +++++----- .../org/apache/spark/sql/DateFunctionsSuite.scala | 15 ++++++- 3 files changed, 59 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index 9a6e6c7..1e6a3aa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -626,9 +626,13 @@ case class UnixTimestamp(timeExp: Expression, format: Expression, timeZoneId: Op override def prettyName: String = "unix_timestamp" } -abstract class UnixTime +abstract class ToTimestamp extends BinaryExpression with TimeZoneAwareExpression with ExpectsInputTypes { + // The result of the conversion to timestamp is microseconds divided by this factor. + // For example if the factor is 1000000, the result of the expression is in seconds. + protected def downScaleFactor: Long + override def inputTypes: Seq[AbstractDataType] = Seq(TypeCollection(StringType, DateType, TimestampType), StringType) @@ -650,16 +654,16 @@ abstract class UnixTime } else { left.dataType match { case DateType => - DateTimeUtils.daysToMillis(t.asInstanceOf[Int], timeZone) / MILLIS_PER_SECOND + epochDaysToMicros(t.asInstanceOf[Int], zoneId) / downScaleFactor case TimestampType => - t.asInstanceOf[Long] / MICROS_PER_SECOND + t.asInstanceOf[Long] / downScaleFactor case StringType if right.foldable => if (constFormat == null || formatter == null) { null } else { try { formatter.parse( - t.asInstanceOf[UTF8String].toString) / MICROS_PER_SECOND + t.asInstanceOf[UTF8String].toString) / downScaleFactor } catch { case NonFatal(_) => null } @@ -672,7 +676,7 @@ abstract class UnixTime val formatString = f.asInstanceOf[UTF8String].toString try { TimestampFormatter(formatString, zoneId).parse( - t.asInstanceOf[UTF8String].toString) / MICROS_PER_SECOND + t.asInstanceOf[UTF8String].toString) / downScaleFactor } catch { case NonFatal(_) => null } @@ -697,7 +701,7 @@ abstract class UnixTime $javaType ${ev.value} = ${CodeGenerator.defaultValue(dataType)}; if (!${ev.isNull}) { try { - ${ev.value} = $formatterName.parse(${eval1.value}.toString()) / $MICROS_PER_SECOND; + ${ev.value} = $formatterName.parse(${eval1.value}.toString()) / $downScaleFactor; } catch (java.lang.IllegalArgumentException e) { ${ev.isNull} = true; } catch (java.text.ParseException e) { @@ -717,7 +721,7 @@ abstract class UnixTime s""" try { ${ev.value} = $tf$$.MODULE$$.apply($format.toString(), $zid, $locale) - .parse($string.toString()) / $MICROS_PER_SECOND; + .parse($string.toString()) / $downScaleFactor; } catch (java.lang.IllegalArgumentException e) { ${ev.isNull} = true; } catch (java.text.ParseException e) { @@ -736,10 +740,10 @@ abstract class UnixTime boolean ${ev.isNull} = ${eval1.isNull}; $javaType ${ev.value} = ${CodeGenerator.defaultValue(dataType)}; if (!${ev.isNull}) { - ${ev.value} = ${eval1.value} / $MICROS_PER_SECOND; + ${ev.value} = ${eval1.value} / $downScaleFactor; }""") case DateType => - val tz = ctx.addReferenceObj("timeZone", timeZone) + val zid = ctx.addReferenceObj("zoneId", zoneId, classOf[ZoneId].getName) val dtu = DateTimeUtils.getClass.getName.stripSuffix("$") val eval1 = left.genCode(ctx) ev.copy(code = code""" @@ -747,12 +751,16 @@ abstract class UnixTime boolean ${ev.isNull} = ${eval1.isNull}; $javaType ${ev.value} = ${CodeGenerator.defaultValue(dataType)}; if (!${ev.isNull}) { - ${ev.value} = $dtu.daysToMillis(${eval1.value}, $tz) / $MILLIS_PER_SECOND; + ${ev.value} = $dtu.epochDaysToMicros(${eval1.value}, $zid) / $downScaleFactor; }""") } } } +abstract class UnixTime extends ToTimestamp { + override val downScaleFactor: Long = MICROS_PER_SECOND +} + /** * Converts the number of seconds from unix epoch (1970-01-01 00:00:00 UTC) to a string * representing the timestamp of that moment in the current system time zone in the given @@ -1357,7 +1365,7 @@ case class ParseToTimestamp(left: Expression, format: Option[Expression], child: extends RuntimeReplaceable { def this(left: Expression, format: Expression) = { - this(left, Option(format), Cast(UnixTimestamp(left, format), TimestampType)) + this(left, Option(format), GetTimestamp(left, format)) } def this(left: Expression) = this(left, None, Cast(left, TimestampType)) @@ -1581,3 +1589,19 @@ case class DateDiff(endDate: Expression, startDate: Expression) defineCodeGen(ctx, ev, (end, start) => s"$end - $start") } } + +/** + * Gets timestamps from strings using given pattern. + */ +private case class GetTimestamp( + left: Expression, + right: Expression, + timeZoneId: Option[String] = None) + extends ToTimestamp { + + override val downScaleFactor = 1 + override def dataType: DataType = TimestampType + + override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = + copy(timeZoneId = Option(timeZoneId)) +} diff --git a/sql/core/benchmarks/DateTimeBenchmark-results.txt b/sql/core/benchmarks/DateTimeBenchmark-results.txt index d994752..1a58b05 100644 --- a/sql/core/benchmarks/DateTimeBenchmark-results.txt +++ b/sql/core/benchmarks/DateTimeBenchmark-results.txt @@ -385,19 +385,19 @@ to timestamp str: Best/Avg Time(ms) Rate(M/s) Per Ro to timestamp str wholestage off 165 / 166 6.1 164.7 1.0X to timestamp str wholestage on 160 / 163 6.2 160.5 1.0X -Java HotSpot(TM) 64-Bit Server VM 1.8.0_202-b08 on Mac OS X 10.14.3 +Java HotSpot(TM) 64-Bit Server VM 1.8.0_202-b08 on Mac OS X 10.14.4 Intel(R) Core(TM) i7-4850HQ CPU @ 2.30GHz -to_timestamp: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------- -to_timestamp wholestage off 1316 / 1320 0.8 1315.7 1.0X -to_timestamp wholestage on 1288 / 1294 0.8 1287.5 1.0X +to_timestamp: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +to_timestamp wholestage off 1308 1353 64 0.8 1307.9 1.0X +to_timestamp wholestage on 1197 1230 21 0.8 1197.0 1.1X -Java HotSpot(TM) 64-Bit Server VM 1.8.0_202-b08 on Mac OS X 10.14.3 +Java HotSpot(TM) 64-Bit Server VM 1.8.0_202-b08 on Mac OS X 10.14.4 Intel(R) Core(TM) i7-4850HQ CPU @ 2.30GHz -to_unix_timestamp: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------- -to_unix_timestamp wholestage off 1295 / 1297 0.8 1295.1 1.0X -to_unix_timestamp wholestage on 1409 / 1414 0.7 1409.2 0.9X +to_unix_timestamp: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +to_unix_timestamp wholestage off 1221 1224 4 0.8 1221.0 1.0X +to_unix_timestamp wholestage on 1224 1228 4 0.8 1223.8 1.0X Java HotSpot(TM) 64-Bit Server VM 1.8.0_202-b08 on Mac OS X 10.14.3 Intel(R) Core(TM) i7-4850HQ CPU @ 2.30GHz diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala index 29cef69..3f91b91 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql import java.sql.{Date, Timestamp} import java.text.SimpleDateFormat +import java.time.Instant import java.util.Locale import java.util.concurrent.TimeUnit @@ -638,6 +639,8 @@ class DateFunctionsSuite extends QueryTest with SharedSQLContext { val ts2 = Timestamp.valueOf("2015-07-25 02:02:02") val s1 = "2015/07/24 10:00:00.5" val s2 = "2015/07/25 02:02:02.6" + val ts1m = Timestamp.valueOf("2015-07-24 10:00:00.5") + val ts2m = Timestamp.valueOf("2015-07-25 02:02:02.6") val ss1 = "2015-07-24 10:00:00" val ss2 = "2015-07-25 02:02:02" val fmt = "yyyy/MM/dd HH:mm:ss.S" @@ -648,7 +651,7 @@ class DateFunctionsSuite extends QueryTest with SharedSQLContext { checkAnswer(df.select(to_timestamp(col("ss"))), Seq( Row(ts1), Row(ts2))) checkAnswer(df.select(to_timestamp(col("s"), fmt)), Seq( - Row(ts1), Row(ts2))) + Row(ts1m), Row(ts2m))) checkAnswer(df.select(to_timestamp(col("ts"), fmt)), Seq( Row(ts1), Row(ts2))) checkAnswer(df.select(to_timestamp(col("d"), "yyyy-MM-dd")), Seq( @@ -751,4 +754,14 @@ class DateFunctionsSuite extends QueryTest with SharedSQLContext { Row(Timestamp.valueOf("2015-07-24 22:00:00")))) } } + + + test("to_timestamp with microseconds precision") { + withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true") { + val timestamp = "1970-01-01T00:00:00.123456Z" + val df = Seq(timestamp).toDF("t") + checkAnswer(df.select(to_timestamp($"t", "yyyy-MM-dd'T'HH:mm:ss.SSSSSSX")), + Seq(Row(Instant.parse(timestamp)))) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org