This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d3e366  [SPARK-31076][SQL] Convert Catalyst's DATE/TIMESTAMP to Java 
Date/Timestamp via local date-time
3d3e366 is described below

commit 3d3e366aa836cb7d2295f54e78e544c7b15c9c08
Author: Maxim Gekk <max.g...@gmail.com>
AuthorDate: Wed Mar 11 20:53:56 2020 +0800

    [SPARK-31076][SQL] Convert Catalyst's DATE/TIMESTAMP to Java Date/Timestamp 
via local date-time
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to change conversion of java.sql.Timestamp/Date values 
to/from internal values of Catalyst's TimestampType/DateType before cutover day 
`1582-10-15` of Gregorian calendar. I propose to construct local date-time from 
microseconds/days since the epoch. Take each date-time component `year`, 
`month`, `day`, `hour`, `minute`, `second` and `second fraction`, and construct 
java.sql.Timestamp/Date using the extracted components.
    
    ### Why are the changes needed?
    This will rebase underlying time/date offset in the way that collected 
java.sql.Timestamp/Date values will have the same local time-date component as 
the original values in Gregorian calendar.
    
    Here is the example which demonstrates the issue:
    ```sql
    scala> sql("select date '1100-10-10'").collect()
    res1: Array[org.apache.spark.sql.Row] = Array([1100-10-03])
    ```
    
    ### Does this PR introduce any user-facing change?
    Yes, after the changes:
    ```sql
    scala> sql("select date '1100-10-10'").collect()
    res0: Array[org.apache.spark.sql.Row] = Array([1100-10-10])
    ```
    
    ### How was this patch tested?
    By running `DateTimeUtilsSuite`, `DateFunctionsSuite` and 
`DateExpressionsSuite`.
    
    Closes #27807 from MaxGekk/rebase-timestamp-before-1582.
    
    Authored-by: Maxim Gekk <max.g...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/catalyst/util/DateTimeUtils.scala    | 39 +++++++++++--
 .../spark/sql/execution/HiveResultSuite.scala      |  6 +-
 .../execution/datasources/orc/OrcColumnVector.java |  3 +-
 .../execution/datasources/orc/OrcColumnVector.java |  3 +-
 .../org/apache/spark/sql/hive/HiveInspectors.scala | 66 ++++++++++++++++++++--
 5 files changed, 102 insertions(+), 15 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index de4c24e..9f207ec 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -47,6 +47,15 @@ object DateTimeUtils {
   // it's 2440587.5, rounding up to compatible with Hive
   final val JULIAN_DAY_OF_EPOCH = 2440588
 
+  final val GREGORIAN_CUTOVER_DAY = LocalDate.of(1582, 10, 15).toEpochDay
+  final val GREGORIAN_CUTOVER_MICROS = instantToMicros(
+    LocalDateTime.of(1582, 10, 15, 0, 0, 0)
+      .atOffset(ZoneOffset.UTC)
+      .toInstant)
+  final val GREGORIAN_CUTOVER_MILLIS = microsToMillis(GREGORIAN_CUTOVER_MICROS)
+
+  final val julianCommonEraStart = Timestamp.valueOf("0001-01-01 00:00:00")
+
   final val TimeZoneGMT = TimeZone.getTimeZone("GMT")
   final val TimeZoneUTC = TimeZone.getTimeZone("UTC")
 
@@ -86,28 +95,50 @@ object DateTimeUtils {
    * Returns the number of days since epoch from java.sql.Date.
    */
   def fromJavaDate(date: Date): SQLDate = {
-    microsToDays(millisToMicros(date.getTime))
+    if (date.getTime < GREGORIAN_CUTOVER_MILLIS) {
+      val era = if (date.before(julianCommonEraStart)) 0 else 1
+      val localDate = date.toLocalDate.`with`(ChronoField.ERA, era)
+      localDateToDays(localDate)
+    } else {
+      microsToDays(millisToMicros(date.getTime))
+    }
   }
 
   /**
    * Returns a java.sql.Date from number of days since epoch.
    */
   def toJavaDate(daysSinceEpoch: SQLDate): Date = {
-    new Date(microsToMillis(daysToMicros(daysSinceEpoch)))
+    if (daysSinceEpoch < GREGORIAN_CUTOVER_DAY) {
+      Date.valueOf(LocalDate.ofEpochDay(daysSinceEpoch))
+    } else {
+      new Date(microsToMillis(daysToMicros(daysSinceEpoch)))
+    }
   }
 
   /**
    * Returns a java.sql.Timestamp from number of micros since epoch.
    */
   def toJavaTimestamp(us: SQLTimestamp): Timestamp = {
-    Timestamp.from(microsToInstant(us))
+    if (us < GREGORIAN_CUTOVER_MICROS) {
+      val ldt = 
microsToInstant(us).atZone(ZoneId.systemDefault()).toLocalDateTime
+      Timestamp.valueOf(ldt)
+    } else {
+      Timestamp.from(microsToInstant(us))
+    }
   }
 
   /**
    * Returns the number of micros since epoch from java.sql.Timestamp.
    */
   def fromJavaTimestamp(t: Timestamp): SQLTimestamp = {
-    instantToMicros(t.toInstant)
+    if (t.getTime < GREGORIAN_CUTOVER_MILLIS) {
+      val era = if (t.before(julianCommonEraStart)) 0 else 1
+      val localDateTime = t.toLocalDateTime.`with`(ChronoField.ERA, era)
+      val instant = ZonedDateTime.of(localDateTime, 
ZoneId.systemDefault()).toInstant
+      instantToMicros(instant)
+    } else {
+      instantToMicros(t.toInstant)
+    }
   }
 
   /**
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala
index bb59b12..bf7cbaa 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala
@@ -23,7 +23,7 @@ class HiveResultSuite extends SharedSparkSession {
   import testImplicits._
 
   test("date formatting in hive result") {
-    val dates = Seq("2018-12-28", "1582-10-13", "1582-10-14", "1582-10-15")
+    val dates = Seq("2018-12-28", "1582-10-03", "1582-10-04", "1582-10-15")
     val df = dates.toDF("a").selectExpr("cast(a as date) as b")
     val executedPlan1 = df.queryExecution.executedPlan
     val result = HiveResult.hiveResultString(executedPlan1)
@@ -36,8 +36,8 @@ class HiveResultSuite extends SharedSparkSession {
   test("timestamp formatting in hive result") {
     val timestamps = Seq(
       "2018-12-28 01:02:03",
-      "1582-10-13 01:02:03",
-      "1582-10-14 01:02:03",
+      "1582-10-03 01:02:03",
+      "1582-10-04 01:02:03",
       "1582-10-15 01:02:03")
     val df = timestamps.toDF("a").selectExpr("cast(a as timestamp) as b")
     val executedPlan1 = df.queryExecution.executedPlan
diff --git 
a/sql/core/v1.2/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
 
b/sql/core/v1.2/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
index 9bfad1e..0dfed76 100644
--- 
a/sql/core/v1.2/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
+++ 
b/sql/core/v1.2/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
@@ -21,6 +21,7 @@ import java.math.BigDecimal;
 
 import org.apache.orc.storage.ql.exec.vector.*;
 
+import org.apache.spark.sql.catalyst.util.DateTimeUtils;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.sql.types.TimestampType;
@@ -136,7 +137,7 @@ public class OrcColumnVector extends 
org.apache.spark.sql.vectorized.ColumnVecto
   public long getLong(int rowId) {
     int index = getRowIndex(rowId);
     if (isTimestamp) {
-      return timestampData.time[index] * 1000 + timestampData.nanos[index] / 
1000 % 1000;
+      return 
DateTimeUtils.fromJavaTimestamp(timestampData.asScratchTimestamp(index));
     } else {
       return longData.vector[index];
     }
diff --git 
a/sql/core/v2.3/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
 
b/sql/core/v2.3/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
index 2f1925e..35447fe 100644
--- 
a/sql/core/v2.3/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
+++ 
b/sql/core/v2.3/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
@@ -21,6 +21,7 @@ import java.math.BigDecimal;
 
 import org.apache.hadoop.hive.ql.exec.vector.*;
 
+import org.apache.spark.sql.catalyst.util.DateTimeUtils;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.sql.types.TimestampType;
@@ -136,7 +137,7 @@ public class OrcColumnVector extends 
org.apache.spark.sql.vectorized.ColumnVecto
   public long getLong(int rowId) {
     int index = getRowIndex(rowId);
     if (isTimestamp) {
-      return timestampData.time[index] * 1000 + timestampData.nanos[index] / 
1000 % 1000;
+      return 
DateTimeUtils.fromJavaTimestamp(timestampData.asScratchTimestamp(index));
     } else {
       return longData.vector[index];
     }
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index 0cd9b36..e217c52 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -18,7 +18,8 @@
 package org.apache.spark.sql.hive
 
 import java.lang.reflect.{ParameterizedType, Type, WildcardType}
-import java.util.concurrent.TimeUnit._
+import java.time.LocalDate
+import java.util.Calendar
 
 import scala.collection.JavaConverters._
 
@@ -181,6 +182,33 @@ import org.apache.spark.unsafe.types.UTF8String
  */
 private[hive] trait HiveInspectors {
 
+  private final val JULIAN_CUTOVER_DAY =
+    rebaseGregorianToJulianDays(DateTimeUtils.GREGORIAN_CUTOVER_DAY.toInt)
+
+  private def rebaseJulianToGregorianDays(daysSinceEpoch: Int): Int = {
+    val localDate = LocalDate.ofEpochDay(daysSinceEpoch)
+    val utcCal = new Calendar.Builder()
+      .setCalendarType("gregory")
+      .setTimeZone(DateTimeUtils.TimeZoneUTC)
+      .setDate(localDate.getYear, localDate.getMonthValue - 1, 
localDate.getDayOfMonth)
+      .build()
+    Math.toIntExact(Math.floorDiv(utcCal.getTimeInMillis, 
DateTimeConstants.MILLIS_PER_DAY))
+  }
+
+  private def rebaseGregorianToJulianDays(daysSinceEpoch: Int): Int = {
+    val millis = Math.multiplyExact(daysSinceEpoch, 
DateTimeConstants.MILLIS_PER_DAY)
+    val utcCal = new Calendar.Builder()
+      .setCalendarType("gregory")
+      .setTimeZone(DateTimeUtils.TimeZoneUTC)
+      .setInstant(millis)
+      .build()
+    val localDate = LocalDate.of(
+      utcCal.get(Calendar.YEAR),
+      utcCal.get(Calendar.MONTH) + 1,
+      utcCal.get(Calendar.DAY_OF_MONTH))
+    Math.toIntExact(localDate.toEpochDay)
+  }
+
   def javaTypeToDataType(clz: Type): DataType = clz match {
     // writable
     case c: Class[_] if c == classOf[hadoopIo.DoubleWritable] => DoubleType
@@ -466,7 +494,7 @@ private[hive] trait HiveInspectors {
         _ => constant
       case poi: WritableConstantTimestampObjectInspector =>
         val t = poi.getWritableConstantValue
-        val constant = SECONDS.toMicros(t.getSeconds) + 
NANOSECONDS.toMicros(t.getNanos)
+        val constant = DateTimeUtils.fromJavaTimestamp(t.getTimestamp)
         _ => constant
       case poi: WritableConstantIntObjectInspector =>
         val constant = poi.getWritableConstantValue.get()
@@ -618,7 +646,14 @@ private[hive] trait HiveInspectors {
         case x: DateObjectInspector if x.preferWritable() =>
           data: Any => {
             if (data != null) {
-              
DateTimeUtils.fromJavaDate(x.getPrimitiveWritableObject(data).get())
+              // Rebasing written days via conversion to local dates.
+              // See the comment for `getDateWritable()`.
+              val daysSinceEpoch = x.getPrimitiveWritableObject(data).getDays
+              if (daysSinceEpoch < JULIAN_CUTOVER_DAY) {
+                rebaseJulianToGregorianDays(daysSinceEpoch)
+              } else {
+                daysSinceEpoch
+              }
             } else {
               null
             }
@@ -634,8 +669,7 @@ private[hive] trait HiveInspectors {
         case x: TimestampObjectInspector if x.preferWritable() =>
           data: Any => {
             if (data != null) {
-              val t = x.getPrimitiveWritableObject(data)
-              SECONDS.toMicros(t.getSeconds) + NANOSECONDS.toMicros(t.getNanos)
+              
DateTimeUtils.fromJavaTimestamp(x.getPrimitiveWritableObject(data).getTimestamp)
             } else {
               null
             }
@@ -1012,7 +1046,27 @@ private[hive] trait HiveInspectors {
     }
 
   private def getDateWritable(value: Any): hiveIo.DateWritable =
-    if (value == null) null else new 
hiveIo.DateWritable(value.asInstanceOf[Int])
+    if (value == null) {
+      null
+    } else {
+      // Rebasing days since the epoch to store the same number of days
+      // as by Spark 2.4 and earlier versions. Spark 3.0 switched to
+      // Proleptic Gregorian calendar (see SPARK-26651), and as a consequence 
of that,
+      // this affects dates before 1582-10-15. Spark 2.4 and earlier versions 
use
+      // Julian calendar for dates before 1582-10-15. So, the same local date 
may
+      // be mapped to different number of days since the epoch in different 
calendars.
+      // For example:
+      // Proleptic Gregorian calendar: 1582-01-01 -> -141714
+      // Julian calendar: 1582-01-01 -> -141704
+      // The code below converts -141714 to -141704.
+      val daysSinceEpoch = value.asInstanceOf[Int]
+      val rebasedDays = if (daysSinceEpoch < 
DateTimeUtils.GREGORIAN_CUTOVER_DAY) {
+        rebaseGregorianToJulianDays(daysSinceEpoch)
+      } else {
+        daysSinceEpoch
+      }
+      new hiveIo.DateWritable(rebasedDays)
+    }
 
   private def getTimestampWritable(value: Any): hiveIo.TimestampWritable =
     if (value == null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to