This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 3d3e366 [SPARK-31076][SQL] Convert Catalyst's DATE/TIMESTAMP to Java Date/Timestamp via local date-time 3d3e366 is described below commit 3d3e366aa836cb7d2295f54e78e544c7b15c9c08 Author: Maxim Gekk <max.g...@gmail.com> AuthorDate: Wed Mar 11 20:53:56 2020 +0800 [SPARK-31076][SQL] Convert Catalyst's DATE/TIMESTAMP to Java Date/Timestamp via local date-time ### What changes were proposed in this pull request? In the PR, I propose to change conversion of java.sql.Timestamp/Date values to/from internal values of Catalyst's TimestampType/DateType before cutover day `1582-10-15` of Gregorian calendar. I propose to construct local date-time from microseconds/days since the epoch. Take each date-time component `year`, `month`, `day`, `hour`, `minute`, `second` and `second fraction`, and construct java.sql.Timestamp/Date using the extracted components. ### Why are the changes needed? This will rebase underlying time/date offset in the way that collected java.sql.Timestamp/Date values will have the same local time-date component as the original values in Gregorian calendar. Here is the example which demonstrates the issue: ```sql scala> sql("select date '1100-10-10'").collect() res1: Array[org.apache.spark.sql.Row] = Array([1100-10-03]) ``` ### Does this PR introduce any user-facing change? Yes, after the changes: ```sql scala> sql("select date '1100-10-10'").collect() res0: Array[org.apache.spark.sql.Row] = Array([1100-10-10]) ``` ### How was this patch tested? By running `DateTimeUtilsSuite`, `DateFunctionsSuite` and `DateExpressionsSuite`. Closes #27807 from MaxGekk/rebase-timestamp-before-1582. Authored-by: Maxim Gekk <max.g...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/catalyst/util/DateTimeUtils.scala | 39 +++++++++++-- .../spark/sql/execution/HiveResultSuite.scala | 6 +- .../execution/datasources/orc/OrcColumnVector.java | 3 +- .../execution/datasources/orc/OrcColumnVector.java | 3 +- .../org/apache/spark/sql/hive/HiveInspectors.scala | 66 ++++++++++++++++++++-- 5 files changed, 102 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index de4c24e..9f207ec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -47,6 +47,15 @@ object DateTimeUtils { // it's 2440587.5, rounding up to compatible with Hive final val JULIAN_DAY_OF_EPOCH = 2440588 + final val GREGORIAN_CUTOVER_DAY = LocalDate.of(1582, 10, 15).toEpochDay + final val GREGORIAN_CUTOVER_MICROS = instantToMicros( + LocalDateTime.of(1582, 10, 15, 0, 0, 0) + .atOffset(ZoneOffset.UTC) + .toInstant) + final val GREGORIAN_CUTOVER_MILLIS = microsToMillis(GREGORIAN_CUTOVER_MICROS) + + final val julianCommonEraStart = Timestamp.valueOf("0001-01-01 00:00:00") + final val TimeZoneGMT = TimeZone.getTimeZone("GMT") final val TimeZoneUTC = TimeZone.getTimeZone("UTC") @@ -86,28 +95,50 @@ object DateTimeUtils { * Returns the number of days since epoch from java.sql.Date. */ def fromJavaDate(date: Date): SQLDate = { - microsToDays(millisToMicros(date.getTime)) + if (date.getTime < GREGORIAN_CUTOVER_MILLIS) { + val era = if (date.before(julianCommonEraStart)) 0 else 1 + val localDate = date.toLocalDate.`with`(ChronoField.ERA, era) + localDateToDays(localDate) + } else { + microsToDays(millisToMicros(date.getTime)) + } } /** * Returns a java.sql.Date from number of days since epoch. */ def toJavaDate(daysSinceEpoch: SQLDate): Date = { - new Date(microsToMillis(daysToMicros(daysSinceEpoch))) + if (daysSinceEpoch < GREGORIAN_CUTOVER_DAY) { + Date.valueOf(LocalDate.ofEpochDay(daysSinceEpoch)) + } else { + new Date(microsToMillis(daysToMicros(daysSinceEpoch))) + } } /** * Returns a java.sql.Timestamp from number of micros since epoch. */ def toJavaTimestamp(us: SQLTimestamp): Timestamp = { - Timestamp.from(microsToInstant(us)) + if (us < GREGORIAN_CUTOVER_MICROS) { + val ldt = microsToInstant(us).atZone(ZoneId.systemDefault()).toLocalDateTime + Timestamp.valueOf(ldt) + } else { + Timestamp.from(microsToInstant(us)) + } } /** * Returns the number of micros since epoch from java.sql.Timestamp. */ def fromJavaTimestamp(t: Timestamp): SQLTimestamp = { - instantToMicros(t.toInstant) + if (t.getTime < GREGORIAN_CUTOVER_MILLIS) { + val era = if (t.before(julianCommonEraStart)) 0 else 1 + val localDateTime = t.toLocalDateTime.`with`(ChronoField.ERA, era) + val instant = ZonedDateTime.of(localDateTime, ZoneId.systemDefault()).toInstant + instantToMicros(instant) + } else { + instantToMicros(t.toInstant) + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala index bb59b12..bf7cbaa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala @@ -23,7 +23,7 @@ class HiveResultSuite extends SharedSparkSession { import testImplicits._ test("date formatting in hive result") { - val dates = Seq("2018-12-28", "1582-10-13", "1582-10-14", "1582-10-15") + val dates = Seq("2018-12-28", "1582-10-03", "1582-10-04", "1582-10-15") val df = dates.toDF("a").selectExpr("cast(a as date) as b") val executedPlan1 = df.queryExecution.executedPlan val result = HiveResult.hiveResultString(executedPlan1) @@ -36,8 +36,8 @@ class HiveResultSuite extends SharedSparkSession { test("timestamp formatting in hive result") { val timestamps = Seq( "2018-12-28 01:02:03", - "1582-10-13 01:02:03", - "1582-10-14 01:02:03", + "1582-10-03 01:02:03", + "1582-10-04 01:02:03", "1582-10-15 01:02:03") val df = timestamps.toDF("a").selectExpr("cast(a as timestamp) as b") val executedPlan1 = df.queryExecution.executedPlan diff --git a/sql/core/v1.2/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java b/sql/core/v1.2/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java index 9bfad1e..0dfed76 100644 --- a/sql/core/v1.2/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java +++ b/sql/core/v1.2/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java @@ -21,6 +21,7 @@ import java.math.BigDecimal; import org.apache.orc.storage.ql.exec.vector.*; +import org.apache.spark.sql.catalyst.util.DateTimeUtils; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Decimal; import org.apache.spark.sql.types.TimestampType; @@ -136,7 +137,7 @@ public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVecto public long getLong(int rowId) { int index = getRowIndex(rowId); if (isTimestamp) { - return timestampData.time[index] * 1000 + timestampData.nanos[index] / 1000 % 1000; + return DateTimeUtils.fromJavaTimestamp(timestampData.asScratchTimestamp(index)); } else { return longData.vector[index]; } diff --git a/sql/core/v2.3/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java b/sql/core/v2.3/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java index 2f1925e..35447fe 100644 --- a/sql/core/v2.3/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java +++ b/sql/core/v2.3/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java @@ -21,6 +21,7 @@ import java.math.BigDecimal; import org.apache.hadoop.hive.ql.exec.vector.*; +import org.apache.spark.sql.catalyst.util.DateTimeUtils; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Decimal; import org.apache.spark.sql.types.TimestampType; @@ -136,7 +137,7 @@ public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVecto public long getLong(int rowId) { int index = getRowIndex(rowId); if (isTimestamp) { - return timestampData.time[index] * 1000 + timestampData.nanos[index] / 1000 % 1000; + return DateTimeUtils.fromJavaTimestamp(timestampData.asScratchTimestamp(index)); } else { return longData.vector[index]; } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index 0cd9b36..e217c52 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -18,7 +18,8 @@ package org.apache.spark.sql.hive import java.lang.reflect.{ParameterizedType, Type, WildcardType} -import java.util.concurrent.TimeUnit._ +import java.time.LocalDate +import java.util.Calendar import scala.collection.JavaConverters._ @@ -181,6 +182,33 @@ import org.apache.spark.unsafe.types.UTF8String */ private[hive] trait HiveInspectors { + private final val JULIAN_CUTOVER_DAY = + rebaseGregorianToJulianDays(DateTimeUtils.GREGORIAN_CUTOVER_DAY.toInt) + + private def rebaseJulianToGregorianDays(daysSinceEpoch: Int): Int = { + val localDate = LocalDate.ofEpochDay(daysSinceEpoch) + val utcCal = new Calendar.Builder() + .setCalendarType("gregory") + .setTimeZone(DateTimeUtils.TimeZoneUTC) + .setDate(localDate.getYear, localDate.getMonthValue - 1, localDate.getDayOfMonth) + .build() + Math.toIntExact(Math.floorDiv(utcCal.getTimeInMillis, DateTimeConstants.MILLIS_PER_DAY)) + } + + private def rebaseGregorianToJulianDays(daysSinceEpoch: Int): Int = { + val millis = Math.multiplyExact(daysSinceEpoch, DateTimeConstants.MILLIS_PER_DAY) + val utcCal = new Calendar.Builder() + .setCalendarType("gregory") + .setTimeZone(DateTimeUtils.TimeZoneUTC) + .setInstant(millis) + .build() + val localDate = LocalDate.of( + utcCal.get(Calendar.YEAR), + utcCal.get(Calendar.MONTH) + 1, + utcCal.get(Calendar.DAY_OF_MONTH)) + Math.toIntExact(localDate.toEpochDay) + } + def javaTypeToDataType(clz: Type): DataType = clz match { // writable case c: Class[_] if c == classOf[hadoopIo.DoubleWritable] => DoubleType @@ -466,7 +494,7 @@ private[hive] trait HiveInspectors { _ => constant case poi: WritableConstantTimestampObjectInspector => val t = poi.getWritableConstantValue - val constant = SECONDS.toMicros(t.getSeconds) + NANOSECONDS.toMicros(t.getNanos) + val constant = DateTimeUtils.fromJavaTimestamp(t.getTimestamp) _ => constant case poi: WritableConstantIntObjectInspector => val constant = poi.getWritableConstantValue.get() @@ -618,7 +646,14 @@ private[hive] trait HiveInspectors { case x: DateObjectInspector if x.preferWritable() => data: Any => { if (data != null) { - DateTimeUtils.fromJavaDate(x.getPrimitiveWritableObject(data).get()) + // Rebasing written days via conversion to local dates. + // See the comment for `getDateWritable()`. + val daysSinceEpoch = x.getPrimitiveWritableObject(data).getDays + if (daysSinceEpoch < JULIAN_CUTOVER_DAY) { + rebaseJulianToGregorianDays(daysSinceEpoch) + } else { + daysSinceEpoch + } } else { null } @@ -634,8 +669,7 @@ private[hive] trait HiveInspectors { case x: TimestampObjectInspector if x.preferWritable() => data: Any => { if (data != null) { - val t = x.getPrimitiveWritableObject(data) - SECONDS.toMicros(t.getSeconds) + NANOSECONDS.toMicros(t.getNanos) + DateTimeUtils.fromJavaTimestamp(x.getPrimitiveWritableObject(data).getTimestamp) } else { null } @@ -1012,7 +1046,27 @@ private[hive] trait HiveInspectors { } private def getDateWritable(value: Any): hiveIo.DateWritable = - if (value == null) null else new hiveIo.DateWritable(value.asInstanceOf[Int]) + if (value == null) { + null + } else { + // Rebasing days since the epoch to store the same number of days + // as by Spark 2.4 and earlier versions. Spark 3.0 switched to + // Proleptic Gregorian calendar (see SPARK-26651), and as a consequence of that, + // this affects dates before 1582-10-15. Spark 2.4 and earlier versions use + // Julian calendar for dates before 1582-10-15. So, the same local date may + // be mapped to different number of days since the epoch in different calendars. + // For example: + // Proleptic Gregorian calendar: 1582-01-01 -> -141714 + // Julian calendar: 1582-01-01 -> -141704 + // The code below converts -141714 to -141704. + val daysSinceEpoch = value.asInstanceOf[Int] + val rebasedDays = if (daysSinceEpoch < DateTimeUtils.GREGORIAN_CUTOVER_DAY) { + rebaseGregorianToJulianDays(daysSinceEpoch) + } else { + daysSinceEpoch + } + new hiveIo.DateWritable(rebasedDays) + } private def getTimestampWritable(value: Any): hiveIo.TimestampWritable = if (value == null) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org