Repository: spark Updated Branches: refs/heads/master 28fa01e2b -> a29081487
[SPARK-8866][SQL] use 1us precision for timestamp type JIRA: https://issues.apache.org/jira/browse/SPARK-8866 Author: Yijie Shen <henry.yijies...@gmail.com> Closes #7283 from yijieshen/micro_timestamp and squashes the following commits: dc735df [Yijie Shen] update CastSuite to avoid round error 714eaea [Yijie Shen] add timestamp_udf into blacklist due to precision lose c3ca2f4 [Yijie Shen] fix unhandled case in CurrentTimestamp 8d4aa6b [Yijie Shen] use 1us precision for timestamp type Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a2908148 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a2908148 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a2908148 Branch: refs/heads/master Commit: a290814877308c6fa9b0f78b1a81145db7651ca4 Parents: 28fa01e Author: Yijie Shen <henry.yijies...@gmail.com> Authored: Wed Jul 8 20:20:17 2015 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Wed Jul 8 20:20:17 2015 -0700 ---------------------------------------------------------------------- python/pyspark/sql/types.py | 2 +- .../spark/sql/catalyst/expressions/Cast.scala | 18 +++++----- .../expressions/datetimeFunctions.scala | 2 +- .../spark/sql/catalyst/util/DateTimeUtils.scala | 38 ++++++++++---------- .../sql/catalyst/expressions/CastSuite.scala | 10 +++--- .../sql/catalyst/util/DateTimeUtilsSuite.scala | 8 ++--- .../apache/spark/sql/json/JacksonParser.scala | 4 +-- .../org/apache/spark/sql/json/JsonRDD.scala | 6 ++-- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 2 +- .../hive/execution/HiveCompatibilitySuite.scala | 6 ++-- .../apache/spark/sql/hive/HiveInspectors.scala | 4 +-- 11 files changed, 50 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/a2908148/python/pyspark/sql/types.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 7e64cb0..fecfe6d 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -775,7 +775,7 @@ def _python_to_sql_converter(dataType): if dt: seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo else time.mktime(dt.timetuple())) - return int(seconds * 1e7 + dt.microsecond * 10) + return int(seconds * 1e6 + dt.microsecond) return to_posix_timstamp else: http://git-wip-us.apache.org/repos/asf/spark/blob/a2908148/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 662ceec..567feca 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -186,7 +186,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w case ByteType => buildCast[Byte](_, b => longToTimestamp(b.toLong)) case DateType => - buildCast[Int](_, d => DateTimeUtils.daysToMillis(d) * 10000) + buildCast[Int](_, d => DateTimeUtils.daysToMillis(d) * 1000) // TimestampWritable.decimalToTimestamp case DecimalType() => buildCast[Decimal](_, d => decimalToTimestamp(d)) @@ -207,16 +207,16 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w } private[this] def decimalToTimestamp(d: Decimal): Long = { - (d.toBigDecimal * 10000000L).longValue() + (d.toBigDecimal * 1000000L).longValue() } - // converting milliseconds to 100ns - private[this] def longToTimestamp(t: Long): Long = t * 10000L - // converting 100ns to seconds - private[this] def timestampToLong(ts: Long): Long = math.floor(ts.toDouble / 10000000L).toLong - // converting 100ns to seconds in double + // converting milliseconds to us + private[this] def longToTimestamp(t: Long): Long = t * 1000L + // converting us to seconds + private[this] def timestampToLong(ts: Long): Long = math.floor(ts.toDouble / 1000000L).toLong + // converting us to seconds in double private[this] def timestampToDouble(ts: Long): Double = { - ts / 10000000.0 + ts / 1000000.0 } // DateConverter @@ -229,7 +229,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w case TimestampType => // throw valid precision more than seconds, according to Hive. // Timestamp.nanos is in 0 to 999,999,999, no more than a second. - buildCast[Long](_, t => DateTimeUtils.millisToDays(t / 10000L)) + buildCast[Long](_, t => DateTimeUtils.millisToDays(t / 1000L)) // Hive throws this exception as a Semantic Exception // It is never possible to compare result when hive return with exception, // so we can return null http://git-wip-us.apache.org/repos/asf/spark/blob/a2908148/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala index a492b96..dd5ec33 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala @@ -51,6 +51,6 @@ case class CurrentTimestamp() extends LeafExpression { override def dataType: DataType = TimestampType override def eval(input: InternalRow): Any = { - System.currentTimeMillis() * 10000L + System.currentTimeMillis() * 1000L } } http://git-wip-us.apache.org/repos/asf/spark/blob/a2908148/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 4269ad5..c1ddee3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -34,8 +34,8 @@ object DateTimeUtils { // see http://stackoverflow.com/questions/466321/convert-unix-timestamp-to-julian final val JULIAN_DAY_OF_EPOCH = 2440587 // and .5 final val SECONDS_PER_DAY = 60 * 60 * 24L - final val HUNDRED_NANOS_PER_SECOND = 1000L * 1000L * 10L - final val NANOS_PER_SECOND = HUNDRED_NANOS_PER_SECOND * 100 + final val MICROS_PER_SECOND = 1000L * 1000L + final val NANOS_PER_SECOND = MICROS_PER_SECOND * 1000L // Java TimeZone has no mention of thread safety. Use thread local instance to be safe. @@ -77,8 +77,8 @@ object DateTimeUtils { threadLocalDateFormat.get.format(toJavaDate(days)) // Converts Timestamp to string according to Hive TimestampWritable convention. - def timestampToString(num100ns: Long): String = { - val ts = toJavaTimestamp(num100ns) + def timestampToString(us: Long): String = { + val ts = toJavaTimestamp(us) val timestampString = ts.toString val formatted = threadLocalTimestampFormat.get.format(ts) @@ -132,52 +132,52 @@ object DateTimeUtils { } /** - * Returns a java.sql.Timestamp from number of 100ns since epoch. + * Returns a java.sql.Timestamp from number of micros since epoch. */ - def toJavaTimestamp(num100ns: Long): Timestamp = { + def toJavaTimestamp(us: Long): Timestamp = { // setNanos() will overwrite the millisecond part, so the milliseconds should be // cut off at seconds - var seconds = num100ns / HUNDRED_NANOS_PER_SECOND - var nanos = num100ns % HUNDRED_NANOS_PER_SECOND + var seconds = us / MICROS_PER_SECOND + var micros = us % MICROS_PER_SECOND // setNanos() can not accept negative value - if (nanos < 0) { - nanos += HUNDRED_NANOS_PER_SECOND + if (micros < 0) { + micros += MICROS_PER_SECOND seconds -= 1 } val t = new Timestamp(seconds * 1000) - t.setNanos(nanos.toInt * 100) + t.setNanos(micros.toInt * 1000) t } /** - * Returns the number of 100ns since epoch from java.sql.Timestamp. + * Returns the number of micros since epoch from java.sql.Timestamp. */ def fromJavaTimestamp(t: Timestamp): Long = { if (t != null) { - t.getTime() * 10000L + (t.getNanos().toLong / 100) % 10000L + t.getTime() * 1000L + (t.getNanos().toLong / 1000) % 1000L } else { 0L } } /** - * Returns the number of 100ns (hundred of nanoseconds) since epoch from Julian day + * Returns the number of microseconds since epoch from Julian day * and nanoseconds in a day */ def fromJulianDay(day: Int, nanoseconds: Long): Long = { // use Long to avoid rounding errors val seconds = (day - JULIAN_DAY_OF_EPOCH).toLong * SECONDS_PER_DAY - SECONDS_PER_DAY / 2 - seconds * HUNDRED_NANOS_PER_SECOND + nanoseconds / 100L + seconds * MICROS_PER_SECOND + nanoseconds / 1000L } /** - * Returns Julian day and nanoseconds in a day from the number of 100ns (hundred of nanoseconds) + * Returns Julian day and nanoseconds in a day from the number of microseconds */ - def toJulianDay(num100ns: Long): (Int, Long) = { - val seconds = num100ns / HUNDRED_NANOS_PER_SECOND + SECONDS_PER_DAY / 2 + def toJulianDay(us: Long): (Int, Long) = { + val seconds = us / MICROS_PER_SECOND + SECONDS_PER_DAY / 2 val day = seconds / SECONDS_PER_DAY + JULIAN_DAY_OF_EPOCH val secondsInDay = seconds % SECONDS_PER_DAY - val nanos = (num100ns % HUNDRED_NANOS_PER_SECOND) * 100L + val nanos = (us % MICROS_PER_SECOND) * 1000L (day.toInt, secondsInDay * NANOS_PER_SECOND + nanos) } } http://git-wip-us.apache.org/repos/asf/spark/blob/a2908148/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala index 518961e..919fdd4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala @@ -293,15 +293,15 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper { } test("cast from timestamp") { - val millis = 15 * 1000 + 2 - val seconds = millis * 1000 + 2 + val millis = 15 * 1000 + 3 + val seconds = millis * 1000 + 3 val ts = new Timestamp(millis) val tss = new Timestamp(seconds) checkEvaluation(cast(ts, ShortType), 15.toShort) checkEvaluation(cast(ts, IntegerType), 15) checkEvaluation(cast(ts, LongType), 15.toLong) - checkEvaluation(cast(ts, FloatType), 15.002f) - checkEvaluation(cast(ts, DoubleType), 15.002) + checkEvaluation(cast(ts, FloatType), 15.003f) + checkEvaluation(cast(ts, DoubleType), 15.003) checkEvaluation(cast(cast(tss, ShortType), TimestampType), DateTimeUtils.fromJavaTimestamp(ts)) checkEvaluation(cast(cast(tss, IntegerType), TimestampType), DateTimeUtils.fromJavaTimestamp(ts)) @@ -317,7 +317,7 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper { Decimal(1)) // A test for higher precision than millis - checkEvaluation(cast(cast(0.0000001, TimestampType), DoubleType), 0.0000001) + checkEvaluation(cast(cast(0.000001, TimestampType), DoubleType), 0.000001) checkEvaluation(cast(Double.NaN, TimestampType), null) checkEvaluation(cast(1.0 / 0.0, TimestampType), null) http://git-wip-us.apache.org/repos/asf/spark/blob/a2908148/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala index 1d4a60c..f63ac19 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala @@ -24,11 +24,11 @@ import org.apache.spark.SparkFunSuite class DateTimeUtilsSuite extends SparkFunSuite { - test("timestamp and 100ns") { + test("timestamp and us") { val now = new Timestamp(System.currentTimeMillis()) - now.setNanos(100) + now.setNanos(1000) val ns = DateTimeUtils.fromJavaTimestamp(now) - assert(ns % 10000000L === 1) + assert(ns % 1000000L === 1) assert(DateTimeUtils.toJavaTimestamp(ns) === now) List(-111111111111L, -1L, 0, 1L, 111111111111L).foreach { t => @@ -38,7 +38,7 @@ class DateTimeUtilsSuite extends SparkFunSuite { } } - test("100ns and julian day") { + test("us and julian day") { val (d, ns) = DateTimeUtils.toJulianDay(0) assert(d === DateTimeUtils.JULIAN_DAY_OF_EPOCH) assert(ns === DateTimeUtils.SECONDS_PER_DAY / 2 * DateTimeUtils.NANOS_PER_SECOND) http://git-wip-us.apache.org/repos/asf/spark/blob/a2908148/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala index 4b8ab63..381e7ed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala @@ -67,10 +67,10 @@ private[sql] object JacksonParser { DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime) case (VALUE_STRING, TimestampType) => - DateTimeUtils.stringToTime(parser.getText).getTime * 10000L + DateTimeUtils.stringToTime(parser.getText).getTime * 1000L case (VALUE_NUMBER_INT, TimestampType) => - parser.getLongValue * 10000L + parser.getLongValue * 1000L case (_, StringType) => val writer = new ByteArrayOutputStream() http://git-wip-us.apache.org/repos/asf/spark/blob/a2908148/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala index 01ba05c..b392a51 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala @@ -401,9 +401,9 @@ private[sql] object JsonRDD extends Logging { private def toTimestamp(value: Any): Long = { value match { - case value: java.lang.Integer => value.asInstanceOf[Int].toLong * 10000L - case value: java.lang.Long => value * 10000L - case value: java.lang.String => DateTimeUtils.stringToTime(value).getTime * 10000L + case value: java.lang.Integer => value.asInstanceOf[Int].toLong * 1000L + case value: java.lang.Long => value * 1000L + case value: java.lang.String => DateTimeUtils.stringToTime(value).getTime * 1000L } } http://git-wip-us.apache.org/repos/asf/spark/blob/a2908148/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 69ab1c2..566a52d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -326,7 +326,7 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter { assert(cal.get(Calendar.HOUR) === 11) assert(cal.get(Calendar.MINUTE) === 22) assert(cal.get(Calendar.SECOND) === 33) - assert(rows(0).getAs[java.sql.Timestamp](2).getNanos === 543543500) + assert(rows(0).getAs[java.sql.Timestamp](2).getNanos === 543543000) } test("test DATE types") { http://git-wip-us.apache.org/repos/asf/spark/blob/a2908148/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 415a816..c884c39 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -254,9 +254,10 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { // the answer is sensitive for jdk version "udf_java_method", - // Spark SQL use Long for TimestampType, lose the precision under 100ns + // Spark SQL use Long for TimestampType, lose the precision under 1us "timestamp_1", - "timestamp_2" + "timestamp_2", + "timestamp_udf" ) /** @@ -803,7 +804,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "timestamp_comparison", "timestamp_lazy", "timestamp_null", - "timestamp_udf", "touch", "transform_ppr1", "transform_ppr2", http://git-wip-us.apache.org/repos/asf/spark/blob/a2908148/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index 4cba175..a8f2ee3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -267,7 +267,7 @@ private[hive] trait HiveInspectors { poi.getWritableConstantValue.getHiveDecimal) case poi: WritableConstantTimestampObjectInspector => val t = poi.getWritableConstantValue - t.getSeconds * 10000000L + t.getNanos / 100L + t.getSeconds * 1000000L + t.getNanos / 1000L case poi: WritableConstantIntObjectInspector => poi.getWritableConstantValue.get() case poi: WritableConstantDoubleObjectInspector => @@ -332,7 +332,7 @@ private[hive] trait HiveInspectors { case x: DateObjectInspector => DateTimeUtils.fromJavaDate(x.getPrimitiveJavaObject(data)) case x: TimestampObjectInspector if x.preferWritable() => val t = x.getPrimitiveWritableObject(data) - t.getSeconds * 10000000L + t.getNanos / 100 + t.getSeconds * 1000000L + t.getNanos / 1000L case ti: TimestampObjectInspector => DateTimeUtils.fromJavaTimestamp(ti.getPrimitiveJavaObject(data)) case _ => pi.getPrimitiveJavaObject(data) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org