This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 79d3bc0  [SPARK-27438][SQL] Parse strings with timestamps by 
to_timestamp() in microsecond precision
79d3bc0 is described below

commit 79d3bc0409ccf978126d9c2bc0740ace196fe55b
Author: Maxim Gekk <max.g...@gmail.com>
AuthorDate: Mon Apr 22 19:41:32 2019 +0800

    [SPARK-27438][SQL] Parse strings with timestamps by to_timestamp() in 
microsecond precision
    
    ## What changes were proposed in this pull request?
    
    In the PR, I propose to parse strings to timestamps in microsecond 
precision by the ` to_timestamp()` function if the specified pattern contains a 
sub-pattern for seconds fractions.
    
    Closes #24342
    
    ## How was this patch tested?
    
    By `DateFunctionsSuite` and `DateExpressionsSuite`
    
    Closes #24420 from MaxGekk/to_timestamp-microseconds3.
    
    Authored-by: Maxim Gekk <max.g...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../catalyst/expressions/datetimeExpressions.scala | 46 ++++++++++++++++------
 sql/core/benchmarks/DateTimeBenchmark-results.txt  | 20 +++++-----
 .../org/apache/spark/sql/DateFunctionsSuite.scala  | 15 ++++++-
 3 files changed, 59 insertions(+), 22 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
index 9a6e6c7..1e6a3aa 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
@@ -626,9 +626,13 @@ case class UnixTimestamp(timeExp: Expression, format: 
Expression, timeZoneId: Op
   override def prettyName: String = "unix_timestamp"
 }
 
-abstract class UnixTime
+abstract class ToTimestamp
   extends BinaryExpression with TimeZoneAwareExpression with ExpectsInputTypes 
{
 
+  // The result of the conversion to timestamp is microseconds divided by this 
factor.
+  // For example if the factor is 1000000, the result of the expression is in 
seconds.
+  protected def downScaleFactor: Long
+
   override def inputTypes: Seq[AbstractDataType] =
     Seq(TypeCollection(StringType, DateType, TimestampType), StringType)
 
@@ -650,16 +654,16 @@ abstract class UnixTime
     } else {
       left.dataType match {
         case DateType =>
-          DateTimeUtils.daysToMillis(t.asInstanceOf[Int], timeZone) / 
MILLIS_PER_SECOND
+          epochDaysToMicros(t.asInstanceOf[Int], zoneId) / downScaleFactor
         case TimestampType =>
-          t.asInstanceOf[Long] / MICROS_PER_SECOND
+          t.asInstanceOf[Long] / downScaleFactor
         case StringType if right.foldable =>
           if (constFormat == null || formatter == null) {
             null
           } else {
             try {
               formatter.parse(
-                t.asInstanceOf[UTF8String].toString) / MICROS_PER_SECOND
+                t.asInstanceOf[UTF8String].toString) / downScaleFactor
             } catch {
               case NonFatal(_) => null
             }
@@ -672,7 +676,7 @@ abstract class UnixTime
             val formatString = f.asInstanceOf[UTF8String].toString
             try {
               TimestampFormatter(formatString, zoneId).parse(
-                t.asInstanceOf[UTF8String].toString) / MICROS_PER_SECOND
+                t.asInstanceOf[UTF8String].toString) / downScaleFactor
             } catch {
               case NonFatal(_) => null
             }
@@ -697,7 +701,7 @@ abstract class UnixTime
             $javaType ${ev.value} = ${CodeGenerator.defaultValue(dataType)};
             if (!${ev.isNull}) {
               try {
-                ${ev.value} = $formatterName.parse(${eval1.value}.toString()) 
/ $MICROS_PER_SECOND;
+                ${ev.value} = $formatterName.parse(${eval1.value}.toString()) 
/ $downScaleFactor;
               } catch (java.lang.IllegalArgumentException e) {
                 ${ev.isNull} = true;
               } catch (java.text.ParseException e) {
@@ -717,7 +721,7 @@ abstract class UnixTime
           s"""
             try {
               ${ev.value} = $tf$$.MODULE$$.apply($format.toString(), $zid, 
$locale)
-                .parse($string.toString()) / $MICROS_PER_SECOND;
+                .parse($string.toString()) / $downScaleFactor;
             } catch (java.lang.IllegalArgumentException e) {
               ${ev.isNull} = true;
             } catch (java.text.ParseException e) {
@@ -736,10 +740,10 @@ abstract class UnixTime
           boolean ${ev.isNull} = ${eval1.isNull};
           $javaType ${ev.value} = ${CodeGenerator.defaultValue(dataType)};
           if (!${ev.isNull}) {
-            ${ev.value} = ${eval1.value} / $MICROS_PER_SECOND;
+            ${ev.value} = ${eval1.value} / $downScaleFactor;
           }""")
       case DateType =>
-        val tz = ctx.addReferenceObj("timeZone", timeZone)
+        val zid = ctx.addReferenceObj("zoneId", zoneId, 
classOf[ZoneId].getName)
         val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
         val eval1 = left.genCode(ctx)
         ev.copy(code = code"""
@@ -747,12 +751,16 @@ abstract class UnixTime
           boolean ${ev.isNull} = ${eval1.isNull};
           $javaType ${ev.value} = ${CodeGenerator.defaultValue(dataType)};
           if (!${ev.isNull}) {
-            ${ev.value} = $dtu.daysToMillis(${eval1.value}, $tz) / 
$MILLIS_PER_SECOND;
+            ${ev.value} = $dtu.epochDaysToMicros(${eval1.value}, $zid) / 
$downScaleFactor;
           }""")
     }
   }
 }
 
+abstract class UnixTime extends ToTimestamp {
+  override val downScaleFactor: Long = MICROS_PER_SECOND
+}
+
 /**
  * Converts the number of seconds from unix epoch (1970-01-01 00:00:00 UTC) to 
a string
  * representing the timestamp of that moment in the current system time zone 
in the given
@@ -1357,7 +1365,7 @@ case class ParseToTimestamp(left: Expression, format: 
Option[Expression], child:
   extends RuntimeReplaceable {
 
   def this(left: Expression, format: Expression) = {
-    this(left, Option(format), Cast(UnixTimestamp(left, format), 
TimestampType))
+    this(left, Option(format), GetTimestamp(left, format))
   }
 
   def this(left: Expression) = this(left, None, Cast(left, TimestampType))
@@ -1581,3 +1589,19 @@ case class DateDiff(endDate: Expression, startDate: 
Expression)
     defineCodeGen(ctx, ev, (end, start) => s"$end - $start")
   }
 }
+
+/**
+ * Gets timestamps from strings using given pattern.
+ */
+private case class GetTimestamp(
+    left: Expression,
+    right: Expression,
+    timeZoneId: Option[String] = None)
+  extends ToTimestamp {
+
+  override val downScaleFactor = 1
+  override def dataType: DataType = TimestampType
+
+  override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
+    copy(timeZoneId = Option(timeZoneId))
+}
diff --git a/sql/core/benchmarks/DateTimeBenchmark-results.txt 
b/sql/core/benchmarks/DateTimeBenchmark-results.txt
index d994752..1a58b05 100644
--- a/sql/core/benchmarks/DateTimeBenchmark-results.txt
+++ b/sql/core/benchmarks/DateTimeBenchmark-results.txt
@@ -385,19 +385,19 @@ to timestamp str:                        Best/Avg 
Time(ms)    Rate(M/s)   Per Ro
 to timestamp str wholestage off                165 /  166          6.1         
164.7       1.0X
 to timestamp str wholestage on                 160 /  163          6.2         
160.5       1.0X
 
-Java HotSpot(TM) 64-Bit Server VM 1.8.0_202-b08 on Mac OS X 10.14.3
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_202-b08 on Mac OS X 10.14.4
 Intel(R) Core(TM) i7-4850HQ CPU @ 2.30GHz
-to_timestamp:                            Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
-------------------------------------------------------------------------------------------------
-to_timestamp wholestage off                   1316 / 1320          0.8        
1315.7       1.0X
-to_timestamp wholestage on                    1288 / 1294          0.8        
1287.5       1.0X
+to_timestamp:                             Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+to_timestamp wholestage off                        1308           1353         
 64          0.8        1307.9       1.0X
+to_timestamp wholestage on                         1197           1230         
 21          0.8        1197.0       1.1X
 
-Java HotSpot(TM) 64-Bit Server VM 1.8.0_202-b08 on Mac OS X 10.14.3
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_202-b08 on Mac OS X 10.14.4
 Intel(R) Core(TM) i7-4850HQ CPU @ 2.30GHz
-to_unix_timestamp:                       Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
-------------------------------------------------------------------------------------------------
-to_unix_timestamp wholestage off              1295 / 1297          0.8        
1295.1       1.0X
-to_unix_timestamp wholestage on               1409 / 1414          0.7        
1409.2       0.9X
+to_unix_timestamp:                        Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+to_unix_timestamp wholestage off                   1221           1224         
  4          0.8        1221.0       1.0X
+to_unix_timestamp wholestage on                    1224           1228         
  4          0.8        1223.8       1.0X
 
 Java HotSpot(TM) 64-Bit Server VM 1.8.0_202-b08 on Mac OS X 10.14.3
 Intel(R) Core(TM) i7-4850HQ CPU @ 2.30GHz
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
index 29cef69..3f91b91 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql
 
 import java.sql.{Date, Timestamp}
 import java.text.SimpleDateFormat
+import java.time.Instant
 import java.util.Locale
 import java.util.concurrent.TimeUnit
 
@@ -638,6 +639,8 @@ class DateFunctionsSuite extends QueryTest with 
SharedSQLContext {
     val ts2 = Timestamp.valueOf("2015-07-25 02:02:02")
     val s1 = "2015/07/24 10:00:00.5"
     val s2 = "2015/07/25 02:02:02.6"
+    val ts1m = Timestamp.valueOf("2015-07-24 10:00:00.5")
+    val ts2m = Timestamp.valueOf("2015-07-25 02:02:02.6")
     val ss1 = "2015-07-24 10:00:00"
     val ss2 = "2015-07-25 02:02:02"
     val fmt = "yyyy/MM/dd HH:mm:ss.S"
@@ -648,7 +651,7 @@ class DateFunctionsSuite extends QueryTest with 
SharedSQLContext {
     checkAnswer(df.select(to_timestamp(col("ss"))), Seq(
       Row(ts1), Row(ts2)))
     checkAnswer(df.select(to_timestamp(col("s"), fmt)), Seq(
-      Row(ts1), Row(ts2)))
+      Row(ts1m), Row(ts2m)))
     checkAnswer(df.select(to_timestamp(col("ts"), fmt)), Seq(
       Row(ts1), Row(ts2)))
     checkAnswer(df.select(to_timestamp(col("d"), "yyyy-MM-dd")), Seq(
@@ -751,4 +754,14 @@ class DateFunctionsSuite extends QueryTest with 
SharedSQLContext {
           Row(Timestamp.valueOf("2015-07-24 22:00:00"))))
     }
   }
+
+
+  test("to_timestamp with microseconds precision") {
+    withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true") {
+      val timestamp = "1970-01-01T00:00:00.123456Z"
+      val df = Seq(timestamp).toDF("t")
+      checkAnswer(df.select(to_timestamp($"t", 
"yyyy-MM-dd'T'HH:mm:ss.SSSSSSX")),
+        Seq(Row(Instant.parse(timestamp))))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to